Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basic broadcast assistant functionality #529

Merged
merged 7 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
417 changes: 351 additions & 66 deletions apps/auracast.py

Large diffs are not rendered by default.

230 changes: 230 additions & 0 deletions apps/device_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import logging
from typing import Callable, Iterable, Optional

import click

from bumble.core import ProtocolError
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.gatt import Service
from bumble.profiles.device_information_service import DeviceInformationServiceProxy
from bumble.profiles.battery_service import BatteryServiceProxy
from bumble.profiles.gap import GenericAccessServiceProxy
from bumble.profiles.tmap import TelephonyAndMediaAudioServiceProxy
from bumble.transport import open_transport_or_link


# -----------------------------------------------------------------------------
async def try_show(function: Callable, *args, **kwargs) -> None:
try:
await function(*args, **kwargs)
except ProtocolError as error:
print(color('ERROR:', 'red'), error)


# -----------------------------------------------------------------------------
def show_services(services: Iterable[Service]) -> None:
for service in services:
print(color(str(service), 'cyan'))

for characteristic in service.characteristics:
print(color(' ' + str(characteristic), 'magenta'))


# -----------------------------------------------------------------------------
async def show_gap_information(
gap_service: GenericAccessServiceProxy,
):
print(color('### Generic Access Profile', 'yellow'))

if gap_service.device_name:
print(
color(' Device Name:', 'green'),
await gap_service.device_name.read_value(),
)

if gap_service.appearance:
print(
color(' Appearance: ', 'green'),
await gap_service.appearance.read_value(),
)

print()


# -----------------------------------------------------------------------------
async def show_device_information(
device_information_service: DeviceInformationServiceProxy,
):
print(color('### Device Information', 'yellow'))

if device_information_service.manufacturer_name:
print(
color(' Manufacturer Name:', 'green'),
await device_information_service.manufacturer_name.read_value(),
)

if device_information_service.model_number:
print(
color(' Model Number: ', 'green'),
await device_information_service.model_number.read_value(),
)

if device_information_service.serial_number:
print(
color(' Serial Number: ', 'green'),
await device_information_service.serial_number.read_value(),
)

if device_information_service.firmware_revision:
print(
color(' Firmware Revision:', 'green'),
await device_information_service.firmware_revision.read_value(),
)

print()


# -----------------------------------------------------------------------------
async def show_battery_level(
battery_service: BatteryServiceProxy,
):
print(color('### Battery Information', 'yellow'))

if battery_service.battery_level:
print(
color(' Battery Level:', 'green'),
await battery_service.battery_level.read_value(),
)

print()


# -----------------------------------------------------------------------------
async def show_tmas(
tmas: TelephonyAndMediaAudioServiceProxy,
):
print(color('### Telephony And Media Audio Service', 'yellow'))

if tmas.role:
print(
color(' Role:', 'green'),
await tmas.role.read_value(),
)

print()


# -----------------------------------------------------------------------------
async def show_device_info(peer, done: Optional[asyncio.Future]) -> None:
try:
# Discover all services
print(color('### Discovering Services and Characteristics', 'magenta'))
await peer.discover_services()
for service in peer.services:
await service.discover_characteristics()

print(color('=== Services ===', 'yellow'))
show_services(peer.services)
print()

if gap_service := peer.create_service_proxy(GenericAccessServiceProxy):
await try_show(show_gap_information, gap_service)

if device_information_service := peer.create_service_proxy(
DeviceInformationServiceProxy
):
await try_show(show_device_information, device_information_service)

if battery_service := peer.create_service_proxy(BatteryServiceProxy):
await try_show(show_battery_level, battery_service)

if tmas := peer.create_service_proxy(TelephonyAndMediaAudioServiceProxy):
await try_show(show_tmas, tmas)

if done is not None:
done.set_result(None)
except asyncio.CancelledError:
print(color('!!! Operation canceled', 'red'))


# -----------------------------------------------------------------------------
async def async_main(device_config, encrypt, transport, address_or_name):
async with await open_transport_or_link(transport) as (hci_source, hci_sink):

# Create a device
if device_config:
device = Device.from_config_file_with_hci(
device_config, hci_source, hci_sink
)
else:
device = Device.with_hci(
'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink
)
await device.power_on()

if address_or_name:
# Connect to the target peer
print(color('>>> Connecting...', 'green'))
connection = await device.connect(address_or_name)
print(color('>>> Connected', 'green'))

# Encrypt the connection if required
if encrypt:
print(color('+++ Encrypting connection...', 'blue'))
await connection.encrypt()
print(color('+++ Encryption established', 'blue'))

await show_device_info(Peer(connection), None)
else:
# Wait for a connection
done = asyncio.get_running_loop().create_future()
device.on(
'connection',
lambda connection: asyncio.create_task(
show_device_info(Peer(connection), done)
),
)
await device.start_advertising(auto_restart=True)

print(color('### Waiting for connection...', 'blue'))
await done


# -----------------------------------------------------------------------------
@click.command()
@click.option('--device-config', help='Device configuration', type=click.Path())
@click.option('--encrypt', help='Encrypt the connection', is_flag=True, default=False)
@click.argument('transport')
@click.argument('address-or-name', required=False)
def main(device_config, encrypt, transport, address_or_name):
"""
Dump the GATT database on a remote device. If ADDRESS_OR_NAME is not specified,
wait for an incoming connection.
"""
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(async_main(device_config, encrypt, transport, address_or_name))


# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()
4 changes: 4 additions & 0 deletions apps/gatt_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,15 @@ async def async_main(device_config, encrypt, transport, address_or_name):

if address_or_name:
# Connect to the target peer
print(color('>>> Connecting...', 'green'))
connection = await device.connect(address_or_name)
print(color('>>> Connected', 'green'))

# Encrypt the connection if required
if encrypt:
print(color('+++ Encrypting connection...', 'blue'))
await connection.encrypt()
print(color('+++ Encryption established', 'blue'))

await dump_gatt_db(Peer(connection), None)
else:
Expand Down
33 changes: 16 additions & 17 deletions apps/lea_unicast/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import wasmtime
import wasmtime.loader
import liblc3 # type: ignore
import logging

import click
import aiohttp.web
Expand All @@ -43,7 +42,7 @@
from bumble.colors import color
from bumble.device import Device, DeviceConfiguration, AdvertisingParameters
from bumble.transport import open_transport
from bumble.profiles import bap
from bumble.profiles import ascs, bap, pacs
from bumble.hci import Address, CodecID, CodingFormat, HCI_IsoDataPacket

# -----------------------------------------------------------------------------
Expand All @@ -57,8 +56,8 @@
DEFAULT_UI_PORT = 7654


def _sink_pac_record() -> bap.PacRecord:
return bap.PacRecord(
def _sink_pac_record() -> pacs.PacRecord:
return pacs.PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=bap.CodecSpecificCapabilities(
supported_sampling_frequencies=(
Expand All @@ -79,8 +78,8 @@ def _sink_pac_record() -> bap.PacRecord:
)


def _source_pac_record() -> bap.PacRecord:
return bap.PacRecord(
def _source_pac_record() -> pacs.PacRecord:
return pacs.PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=bap.CodecSpecificCapabilities(
supported_sampling_frequencies=(
Expand Down Expand Up @@ -447,7 +446,7 @@ async def run(self) -> None:
)

self.device.add_service(
bap.PublishedAudioCapabilitiesService(
pacs.PublishedAudioCapabilitiesService(
supported_source_context=bap.ContextType(0xFFFF),
available_source_context=bap.ContextType(0xFFFF),
supported_sink_context=bap.ContextType(0xFFFF), # All context types
Expand All @@ -461,10 +460,10 @@ async def run(self) -> None:
)
)

ascs = bap.AudioStreamControlService(
ascs_service = ascs.AudioStreamControlService(
self.device, sink_ase_id=[1], source_ase_id=[2]
)
self.device.add_service(ascs)
self.device.add_service(ascs_service)

advertising_data = bytes(
AdvertisingData(
Expand All @@ -479,13 +478,13 @@ async def run(self) -> None:
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(bap.PublishedAudioCapabilitiesService.UUID),
bytes(pacs.PublishedAudioCapabilitiesService.UUID),
),
]
)
) + bytes(bap.UnicastServerAdvertisingData())

def on_pdu(pdu: HCI_IsoDataPacket, ase: bap.AseStateMachine):
def on_pdu(pdu: HCI_IsoDataPacket, ase: ascs.AseStateMachine):
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
pcm = decode(
Expand All @@ -495,12 +494,12 @@ def on_pdu(pdu: HCI_IsoDataPacket, ase: bap.AseStateMachine):
)
self.device.abort_on('disconnection', self.ui_server.send_audio(pcm))

def on_ase_state_change(ase: bap.AseStateMachine) -> None:
if ase.state == bap.AseStateMachine.State.STREAMING:
def on_ase_state_change(ase: ascs.AseStateMachine) -> None:
if ase.state == ascs.AseStateMachine.State.STREAMING:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
assert ase.cis_link
if ase.role == bap.AudioRole.SOURCE:
if ase.role == ascs.AudioRole.SOURCE:
ase.cis_link.abort_on(
'disconnection',
lc3_source_task(
Expand All @@ -516,10 +515,10 @@ def on_ase_state_change(ase: bap.AseStateMachine) -> None:
)
else:
ase.cis_link.sink = functools.partial(on_pdu, ase=ase)
elif ase.state == bap.AseStateMachine.State.CODEC_CONFIGURED:
elif ase.state == ascs.AseStateMachine.State.CODEC_CONFIGURED:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
if ase.role == bap.AudioRole.SOURCE:
if ase.role == ascs.AudioRole.SOURCE:
setup_encoders(
codec_config.sampling_frequency.hz,
codec_config.frame_duration.us,
Expand All @@ -532,7 +531,7 @@ def on_ase_state_change(ase: bap.AseStateMachine) -> None:
codec_config.audio_channel_allocation.channel_count,
)

for ase in ascs.ase_state_machines.values():
for ase in ascs_service.ase_state_machines.values():
ase.on('state_change', functools.partial(on_ase_state_change, ase=ase))

await self.device.power_on()
Expand Down
4 changes: 4 additions & 0 deletions bumble/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ class InvalidOperationError(BaseBumbleError, RuntimeError):
"""Invalid Operation Error"""


class NotSupportedError(BaseBumbleError, RuntimeError):
"""Not Supported"""


class OutOfResourcesError(BaseBumbleError, RuntimeError):
"""Out of Resources Error"""

Expand Down
Loading
Loading