diff --git a/bumble/drivers/__init__.py b/bumble/drivers/__init__.py new file mode 100644 index 00000000..2decab71 --- /dev/null +++ b/bumble/drivers/__init__.py @@ -0,0 +1,68 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Drivers that can be used to customize the interaction between a host and a controller, +like loading firmware after a cold start. +""" + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import abc +import logging +from . import rtk + + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Classes +# ----------------------------------------------------------------------------- +class Driver(abc.ABC): + """Base class for drivers.""" + + @staticmethod + async def for_host(_host): + """Return a driver instance for a host. + + Args: + host: Host object for which a driver should be created. + + Returns: + A Driver instance if a driver should be instantiated for this host, or + None if no driver instance of this class is needed. + """ + return None + + @abc.abstractmethod + async def init_controller(self): + """Initialize the controller.""" + + +# ----------------------------------------------------------------------------- +# Functions +# ----------------------------------------------------------------------------- +async def get_driver_for_host(host): + """Probe all known diver classes until one returns a valid instance for a host, + or none is found. + """ + if driver := await rtk.Driver.for_host(host): + logger.debug("Instantiated RTK driver") + return driver + + return None diff --git a/bumble/drivers/rtk.py b/bumble/drivers/rtk.py new file mode 100644 index 00000000..c0bccc92 --- /dev/null +++ b/bumble/drivers/rtk.py @@ -0,0 +1,647 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Support for Realtek USB dongles. +Based on various online bits of information, including the Linux kernel. +(see `drivers/bluetooth/btrtl.c`) +""" + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from dataclasses import dataclass +import asyncio +import enum +import logging +import math +import os +import pathlib +import platform +import struct +from typing import Tuple +import weakref + + +from bumble.hci import ( + hci_command_op_code, + STATUS_SPEC, + HCI_SUCCESS, + HCI_COMMAND_NAMES, + HCI_Command, + HCI_Reset_Command, + HCI_Read_Local_Version_Information_Command, +) + + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +RTK_ROM_LMP_8723A = 0x1200 +RTK_ROM_LMP_8723B = 0x8723 +RTK_ROM_LMP_8821A = 0x8821 +RTK_ROM_LMP_8761A = 0x8761 +RTK_ROM_LMP_8822B = 0x8822 +RTK_ROM_LMP_8852A = 0x8852 +RTK_CONFIG_MAGIC = 0x8723AB55 + +RTK_EPATCH_SIGNATURE = b"Realtech" + +RTK_FRAGMENT_LENGTH = 252 + +RTK_FIRMWARE_DIR_ENV = "BUMBLE_RTK_FIRMWARE_DIR" +RTK_LINUX_FIRMWARE_DIR = "/lib/firmware/rtl_bt" + + +class RtlProjectId(enum.IntEnum): + PROJECT_ID_8723A = 0 + PROJECT_ID_8723B = 1 + PROJECT_ID_8821A = 2 + PROJECT_ID_8761A = 3 + PROJECT_ID_8822B = 8 + PROJECT_ID_8723D = 9 + PROJECT_ID_8821C = 10 + PROJECT_ID_8822C = 13 + PROJECT_ID_8761B = 14 + PROJECT_ID_8852A = 18 + PROJECT_ID_8852B = 20 + PROJECT_ID_8852C = 25 + + +RTK_PROJECT_ID_TO_ROM = { + 0: RTK_ROM_LMP_8723A, + 1: RTK_ROM_LMP_8723B, + 2: RTK_ROM_LMP_8821A, + 3: RTK_ROM_LMP_8761A, + 8: RTK_ROM_LMP_8822B, + 9: RTK_ROM_LMP_8723B, + 10: RTK_ROM_LMP_8821A, + 13: RTK_ROM_LMP_8822B, + 14: RTK_ROM_LMP_8761A, + 18: RTK_ROM_LMP_8852A, + 20: RTK_ROM_LMP_8852A, + 25: RTK_ROM_LMP_8852A, +} + +# List of USB (VendorID, ProductID) for Realtek-based devices. +RTK_USB_PRODUCTS = { + # Realtek 8723AE + (0x0930, 0x021D), + (0x13D3, 0x3394), + # Realtek 8723BE + (0x0489, 0xE085), + (0x0489, 0xE08B), + (0x04F2, 0xB49F), + (0x13D3, 0x3410), + (0x13D3, 0x3416), + (0x13D3, 0x3459), + (0x13D3, 0x3494), + # Realtek 8723BU + (0x7392, 0xA611), + # Realtek 8723DE + (0x0BDA, 0xB009), + (0x2FF8, 0xB011), + # Realtek 8761BUV + (0x0B05, 0x190E), + (0x0BDA, 0x8771), + (0x2230, 0x0016), + (0x2357, 0x0604), + (0x2550, 0x8761), + (0x2B89, 0x8761), + (0x7392, 0xC611), + # Realtek 8821AE + (0x0B05, 0x17DC), + (0x13D3, 0x3414), + (0x13D3, 0x3458), + (0x13D3, 0x3461), + (0x13D3, 0x3462), + # Realtek 8821CE + (0x0BDA, 0xB00C), + (0x0BDA, 0xC822), + (0x13D3, 0x3529), + # Realtek 8822BE + (0x0B05, 0x185C), + (0x13D3, 0x3526), + # Realtek 8822CE + (0x04C5, 0x161F), + (0x04CA, 0x4005), + (0x0B05, 0x18EF), + (0x0BDA, 0xB00C), + (0x0BDA, 0xC123), + (0x0BDA, 0xC822), + (0x0CB5, 0xC547), + (0x1358, 0xC123), + (0x13D3, 0x3548), + (0x13D3, 0x3549), + (0x13D3, 0x3553), + (0x13D3, 0x3555), + (0x2FF8, 0x3051), + # Realtek 8822CU + (0x13D3, 0x3549), + # Realtek 8852AE + (0x04C5, 0x165C), + (0x04CA, 0x4006), + (0x0BDA, 0x2852), + (0x0BDA, 0x385A), + (0x0BDA, 0x4852), + (0x0BDA, 0xC852), + (0x0CB8, 0xC549), + # Realtek 8852BE + (0x0BDA, 0x887B), + (0x0CB8, 0xC559), + (0x13D3, 0x3571), + # Realtek 8852CE + (0x04C5, 0x1675), + (0x04CA, 0x4007), + (0x0CB8, 0xC558), + (0x13D3, 0x3586), + (0x13D3, 0x3587), + (0x13D3, 0x3592), +} + +# ----------------------------------------------------------------------------- +# HCI Commands +# ----------------------------------------------------------------------------- +HCI_RTK_READ_ROM_VERSION_COMMAND = hci_command_op_code(0x3F, 0x6D) +HCI_COMMAND_NAMES[HCI_RTK_READ_ROM_VERSION_COMMAND] = "HCI_RTK_READ_ROM_VERSION_COMMAND" + + +@HCI_Command.command(return_parameters_fields=[("status", STATUS_SPEC), ("version", 1)]) +class HCI_RTK_Read_ROM_Version_Command(HCI_Command): + pass + + +HCI_RTK_DOWNLOAD_COMMAND = hci_command_op_code(0x3F, 0x20) +HCI_COMMAND_NAMES[HCI_RTK_DOWNLOAD_COMMAND] = "HCI_RTK_DOWNLOAD_COMMAND" + + +@HCI_Command.command( + fields=[("index", 1), ("payload", RTK_FRAGMENT_LENGTH)], + return_parameters_fields=[("status", STATUS_SPEC), ("index", 1)], +) +class HCI_RTK_Download_Command(HCI_Command): + pass + + +HCI_RTK_DROP_FIRMWARE_COMMAND = hci_command_op_code(0x3F, 0x66) +HCI_COMMAND_NAMES[HCI_RTK_DROP_FIRMWARE_COMMAND] = "HCI_RTK_DROP_FIRMWARE_COMMAND" + + +@HCI_Command.command() +class HCI_RTK_Drop_Firmware_Command(HCI_Command): + pass + + +# ----------------------------------------------------------------------------- +class Firmware: + def __init__(self, firmware): + extension_sig = bytes([0x51, 0x04, 0xFD, 0x77]) + + if not firmware.startswith(RTK_EPATCH_SIGNATURE): + raise ValueError("Firmware does not start with epatch signature") + + if not firmware.endswith(extension_sig): + raise ValueError("Firmware does not end with extension sig") + + # The firmware should start with a 14 byte header. + epatch_header_size = 14 + if len(firmware) < epatch_header_size: + raise ValueError("Firmware too short") + + # Look for the "project ID", starting from the end. + offset = len(firmware) - len(extension_sig) + project_id = -1 + while offset >= epatch_header_size: + length, opcode = firmware[offset - 2 : offset] + offset -= 2 + + if opcode == 0xFF: + # End + break + + if length == 0: + raise ValueError("Invalid 0-length instruction") + + if opcode == 0 and length == 1: + project_id = firmware[offset - 1] + break + + offset -= length + + if project_id < 0: + raise ValueError("Project ID not found") + + self.project_id = project_id + + # Read the patch tables info. + self.version, num_patches = struct.unpack("... (16 bits each) + # ... (16 bits each) + # ... (32 bits each) + if epatch_header_size + 8 * num_patches > len(firmware): + raise ValueError("Firmware too short") + chip_id_table_offset = epatch_header_size + patch_length_table_offset = chip_id_table_offset + 2 * num_patches + patch_offset_table_offset = chip_id_table_offset + 4 * num_patches + for patch_index in range(num_patches): + chip_id_offset = chip_id_table_offset + 2 * patch_index + (chip_id,) = struct.unpack_from(" len(firmware): + raise ValueError("Firmware too short") + + # Get the SVN version for the patch + (svn_version,) = struct.unpack_from( + "= 0x80: + download_index += 1 + if fragment_index == fragment_count - 1: + download_index |= 0x80 # End marker. + fragment_offset = fragment_index * RTK_FRAGMENT_LENGTH + fragment = payload[fragment_offset : fragment_offset + RTK_FRAGMENT_LENGTH] + logger.debug(f"downloading fragment {fragment_index}") + await self.host.send_command( + HCI_RTK_Download_Command( + index=download_index, payload=fragment, check_result=True + ) + ) + + logger.debug("download complete!") + + # Read the version again + response = await self.host.send_command( + HCI_RTK_Read_ROM_Version_Command(), check_result=True + ) + if response.return_parameters.status != HCI_SUCCESS: + logger.warning("can't get ROM version") + else: + rom_version = response.return_parameters.version + logger.debug(f"ROM version after download: {rom_version:04X}") + + async def download_firmware(self): + if self.driver_info.rom == RTK_ROM_LMP_8723A: + return await self.download_for_rtl8723a() + + if self.driver_info.rom in ( + RTK_ROM_LMP_8723B, + RTK_ROM_LMP_8821A, + RTK_ROM_LMP_8761A, + RTK_ROM_LMP_8822B, + RTK_ROM_LMP_8852A, + ): + return await self.download_for_rtl8723b() + + raise ValueError("ROM not supported") + + async def init_controller(self): + await self.download_firmware() + await self.host.send_command(HCI_Reset_Command(), check_result=True) + logger.info(f"loaded FW image {self.driver_info.fw_name}") diff --git a/bumble/hci.py b/bumble/hci.py index 97ec0cb0..cba207dc 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -1641,9 +1641,11 @@ def format_fields(hci_object, keys, indentation='', value_mappers=None): # Get the value for the field value = hci_object[key] - # Map the value if needed + # Check if there's a matching mapper passed if value_mappers: value_mapper = value_mappers.get(key, value_mapper) + + # Map the value if we have a mapper if value_mapper is not None: value = value_mapper(value) diff --git a/bumble/host.py b/bumble/host.py index a33efc80..e41fd021 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -23,6 +23,7 @@ from bumble.colors import color from bumble.l2cap import L2CAP_PDU from bumble.snoop import Snooper +from bumble import drivers from typing import Optional @@ -116,6 +117,7 @@ def __init__(self, controller_source=None, controller_sink=None): super().__init__() self.hci_sink = None + self.hci_metadata = None self.ready = False # True when we can accept incoming packets self.reset_done = False self.connections = {} # Connections, by connection handle @@ -141,6 +143,9 @@ def __init__(self, controller_source=None, controller_sink=None): # Connect to the source and sink if specified if controller_source: controller_source.set_packet_sink(self) + self.hci_metadata = getattr( + controller_source, 'metadata', self.hci_metadata + ) if controller_sink: self.set_packet_sink(controller_sink) @@ -170,7 +175,7 @@ async def flush(self) -> None: self.emit('flush') self.command_semaphore.release() - async def reset(self): + async def reset(self, driver_factory=drivers.get_driver_for_host): if self.ready: self.ready = False await self.flush() @@ -178,6 +183,15 @@ async def reset(self): await self.send_command(HCI_Reset_Command(), check_result=True) self.ready = True + # Instantiate and init a driver for the host if needed. + # NOTE: we don't keep a reference to the driver here, because we don't + # currently have a need for the driver later on. But if the driver interface + # evolves, it may be required, then, to store a reference to the driver in + # an object property. + if driver_factory is not None: + if driver := await driver_factory(self): + await driver.init_controller() + response = await self.send_command( HCI_Read_Local_Supported_Commands_Command(), check_result=True ) @@ -298,7 +312,7 @@ def send_hci_packet(self, packet): if self.snooper: self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER) - self.hci_sink.on_packet(packet.to_bytes()) + self.hci_sink.on_packet(bytes(packet)) async def send_command(self, command, check_result=False): logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}') @@ -350,7 +364,7 @@ async def send_command(command): asyncio.create_task(send_command(command)) def send_l2cap_pdu(self, connection_handle, cid, pdu): - l2cap_pdu = L2CAP_PDU(cid, pdu).to_bytes() + l2cap_pdu = bytes(L2CAP_PDU(cid, pdu)) # Send the data to the controller via ACL packets bytes_remaining = len(l2cap_pdu) diff --git a/bumble/transport/usb.py b/bumble/transport/usb.py index 68c5a6f2..13cad609 100644 --- a/bumble/transport/usb.py +++ b/bumble/transport/usb.py @@ -206,10 +206,11 @@ async def terminate(self): logger.debug('OUT transfer likely already completed') class UsbPacketSource(asyncio.Protocol, ParserSource): - def __init__(self, context, device, acl_in, events_in): + def __init__(self, context, device, metadata, acl_in, events_in): super().__init__() self.context = context self.device = device + self.metadata = metadata self.acl_in = acl_in self.events_in = events_in self.loop = asyncio.get_running_loop() @@ -510,6 +511,10 @@ def find_endpoints(device): f'events_in=0x{events_in:02X}, ' ) + device_metadata = { + 'vendor_id': found.getVendorID(), + 'product_id': found.getProductID(), + } device = found.open() # Auto-detach the kernel driver if supported @@ -535,7 +540,7 @@ def find_endpoints(device): except usb1.USBError: logger.warning('failed to set configuration') - source = UsbPacketSource(context, device, acl_in, events_in) + source = UsbPacketSource(context, device, device_metadata, acl_in, events_in) sink = UsbPacketSink(device, acl_out) return UsbTransport(context, device, interface, setting, source, sink) except usb1.USBError as error: diff --git a/docs/mkdocs/mkdocs.yml b/docs/mkdocs/mkdocs.yml index fde6b40b..0cf65f11 100644 --- a/docs/mkdocs/mkdocs.yml +++ b/docs/mkdocs/mkdocs.yml @@ -36,6 +36,9 @@ nav: - HCI Socket: transports/hci_socket.md - Android Emulator: transports/android_emulator.md - File: transports/file.md + - Drivers: + - Overview: drivers/index.md + - Realtek: drivers/realtek.md - API: - Guide: api/guide.md - Examples: api/examples.md diff --git a/docs/mkdocs/src/drivers/index.md b/docs/mkdocs/src/drivers/index.md new file mode 100644 index 00000000..a904e006 --- /dev/null +++ b/docs/mkdocs/src/drivers/index.md @@ -0,0 +1,10 @@ +DRIVERS +======= + +Some Bluetooth controllers require a driver to function properly. +This may include, for instance, loading a Firmware image or patch, +loading a configuration. + +Drivers included in the module are: + + * [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles. \ No newline at end of file diff --git a/docs/mkdocs/src/drivers/realtek.md b/docs/mkdocs/src/drivers/realtek.md new file mode 100644 index 00000000..acbce490 --- /dev/null +++ b/docs/mkdocs/src/drivers/realtek.md @@ -0,0 +1,62 @@ +REALTEK DRIVER +============== + +This driver supports loading firmware images and optional config data to +USB dongles with a Realtek chipset. +A number of USB dongles are supported, but likely not all. +When using a USB dongle, the USB product ID and manufacturer ID are used +to find whether a matching set of firmware image and config data +is needed for that specific model. If a match exists, the driver will try +load the firmware image and, if needed, config data. +The driver will look for those files by name, in order, in: + + * The directory specified by the environment variable `BUMBLE_RTK_FIRMWARE_DIR` + if set. + * The directory `/drivers/rtk_fw` where `` is the directory + where the `bumble` package is installed. + * The current directory. + + +Obtaining Firmware Images and Config Data +----------------------------------------- + +Firmware images and config data may be obtained from a variety of online +sources. +To facilitate finding a downloading the, the utility program `bumble-rtk-fw-download` +may be used. + +``` +Usage: bumble-rtk-fw-download [OPTIONS] + + Download RTK firmware images and configs. + +Options: + --output-dir TEXT Output directory where the files will be + saved [default: .] + --source [linux-kernel|realtek-opensource|linux-from-scratch] + [default: linux-kernel] + --single TEXT Only download a single image set, by its + base name + --force Overwrite files if they already exist + --parse Parse the FW image after saving + --help Show this message and exit. +``` + +Utility +------- + +The `bumble-rtk-util` utility may be used to interact with a Realtek USB dongle +and/or firmware images. + +``` +Usage: bumble-rtk-util [OPTIONS] COMMAND [ARGS]... + +Options: + --help Show this message and exit. + +Commands: + drop Drop a firmware image from the USB dongle. + info Get the firmware info from a USB dongle. + load Load a firmware image into the USB dongle. + parse Parse a firmware image. +``` \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 21324381..a7a09d63 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,11 +24,12 @@ url = https://github.com/google/bumble [options] python_requires = >=3.8 -packages = bumble, bumble.transport, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora +packages = bumble, bumble.transport, bumble.drivers, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora, bumble.tools package_dir = bumble = bumble bumble.apps = apps -include-package-data = True + bumble.tools = tools +include_package_data = True install_requires = aiohttp ~= 3.8; platform_system!='Emscripten' appdirs >= 1.4 @@ -64,6 +65,8 @@ console_scripts = bumble-bench = bumble.apps.bench:main bumble-speaker = bumble.apps.speaker.speaker:main bumble-pandora-server = bumble.apps.pandora_server:main + bumble-rtk-util = bumble.tools.rtk_util:main + bumble-rtk-fw-download = bumble.tools.rtk_fw_download:main [options.package_data] * = py.typed, *.pyi diff --git a/tools/__init__.py b/tools/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/utils/generate_company_id_list.py b/tools/generate_company_id_list.py similarity index 100% rename from utils/generate_company_id_list.py rename to tools/generate_company_id_list.py diff --git a/tools/rtk_fw_download.py b/tools/rtk_fw_download.py new file mode 100644 index 00000000..b0271410 --- /dev/null +++ b/tools/rtk_fw_download.py @@ -0,0 +1,149 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import logging +import pathlib +import urllib.request +import urllib.error + +import click + +from bumble.colors import color +from bumble.drivers import rtk +from bumble.tools import rtk_util + + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +LINUX_KERNEL_GIT_SOURCE = ( + "https://git.kernel.org/pub/scm/linux/kernel/git/firmware/linux-firmware.git/plain/rtl_bt", + False, +) +REALTEK_OPENSOURCE_SOURCE = ( + "https://github.com/Realtek-OpenSource/android_hardware_realtek/raw/rtk1395/bt/rtkbt/Firmware/BT", + True, +) +LINUX_FROM_SCRATCH_SOURCE = ( + "https://anduin.linuxfromscratch.org/sources/linux-firmware/rtl_bt", + False, +) + +# ----------------------------------------------------------------------------- +# Functions +# ----------------------------------------------------------------------------- +def download_file(base_url, name, remove_suffix): + if remove_suffix: + name = name.replace(".bin", "") + + url = f"{base_url}/{name}" + with urllib.request.urlopen(url) as file: + data = file.read() + print(f"Downloaded {name}: {len(data)} bytes") + return data + + +# ----------------------------------------------------------------------------- +@click.command +@click.option( + "--output-dir", + default=".", + help="Output directory where the files will be saved", + show_default=True, +) +@click.option( + "--source", + type=click.Choice(["linux-kernel", "realtek-opensource", "linux-from-scratch"]), + default="linux-kernel", + show_default=True, +) +@click.option("--single", help="Only download a single image set, by its base name") +@click.option("--force", is_flag=True, help="Overwrite files if they already exist") +@click.option("--parse", is_flag=True, help="Parse the FW image after saving") +def main(output_dir, source, single, force, parse): + """Download RTK firmware images and configs.""" + + # Check that the output dir exists + output_dir = pathlib.Path(output_dir) + if not output_dir.is_dir(): + print("Output dir does not exist or is not a directory") + return + + base_url, remove_suffix = { + "linux-kernel": LINUX_KERNEL_GIT_SOURCE, + "realtek-opensource": REALTEK_OPENSOURCE_SOURCE, + "linux-from-scratch": LINUX_FROM_SCRATCH_SOURCE, + }[source] + + print("Downloading") + print(color("FROM:", "green"), base_url) + print(color("TO:", "green"), output_dir) + + if single: + images = [(f"{single}_fw.bin", f"{single}_config.bin", True)] + else: + images = [ + (driver_info.fw_name, driver_info.config_name, driver_info.config_needed) + for driver_info in rtk.Driver.DRIVER_INFOS + ] + + for (fw_name, config_name, config_needed) in images: + print(color("---", "yellow")) + fw_image_out = output_dir / fw_name + if not force and fw_image_out.exists(): + print(color(f"{fw_image_out} already exists, skipping", "red")) + continue + if config_name: + config_image_out = output_dir / config_name + if not force and config_image_out.exists(): + print(color("f{config_out} already exists, skipping", "red")) + continue + + try: + fw_image = download_file(base_url, fw_name, remove_suffix) + except urllib.error.HTTPError as error: + print(f"Failed to download {fw_name}: {error}") + continue + + config_image = None + if config_name: + try: + config_image = download_file(base_url, config_name, remove_suffix) + except urllib.error.HTTPError as error: + if config_needed: + print(f"Failed to download {config_name}: {error}") + continue + else: + print(f"No config available as {config_name}") + + fw_image_out.write_bytes(fw_image) + if parse and config_name: + print(color("Parsing:", "cyan"), fw_name) + rtk_util.do_parse(fw_image_out) + if config_image: + config_image_out.write_bytes(config_image) + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + main() diff --git a/tools/rtk_util.py b/tools/rtk_util.py new file mode 100644 index 00000000..74529152 --- /dev/null +++ b/tools/rtk_util.py @@ -0,0 +1,161 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import logging +import asyncio +import os + +import click + +from bumble import transport +from bumble.host import Host +from bumble.drivers import rtk + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +def do_parse(firmware_path): + with open(firmware_path, 'rb') as firmware_file: + firmware_data = firmware_file.read() + firmware = rtk.Firmware(firmware_data) + print( + f"Firmware: version=0x{firmware.version:08X} " + f"project_id=0x{firmware.project_id:04X}" + ) + for patch in firmware.patches: + print( + f" Patch: chip_id=0x{patch[0]:04X}, " + f"{len(patch[1])} bytes, " + f"SVN Version={patch[2]:08X}" + ) + + +# ----------------------------------------------------------------------------- +async def do_load(usb_transport, force): + async with await transport.open_transport_or_link(usb_transport) as ( + hci_source, + hci_sink, + ): + # Create a host to communicate with the device + host = Host(hci_source, hci_sink) + await host.reset(driver_factory=None) + + # Get the driver. + driver = await rtk.Driver.for_host(host, force) + if driver is None: + if not force: + print("Firmware already loaded or no supported driver for this device.") + return + + await driver.download_firmware() + + +# ----------------------------------------------------------------------------- +async def do_drop(usb_transport): + async with await transport.open_transport_or_link(usb_transport) as ( + hci_source, + hci_sink, + ): + # Create a host to communicate with the device + host = Host(hci_source, hci_sink) + await host.reset(driver_factory=None) + + # Tell the device to reset/drop any loaded patch + await rtk.Driver.drop_firmware(host) + + +# ----------------------------------------------------------------------------- +async def do_info(usb_transport, force): + async with await transport.open_transport(usb_transport) as ( + hci_source, + hci_sink, + ): + # Create a host to communicate with the device + host = Host(hci_source, hci_sink) + await host.reset(driver_factory=None) + + # Check if this is a supported device. + if not force and not rtk.Driver.check(host): + print("USB device not supported by this RTK driver") + return + + # Get the driver info. + driver_info = await rtk.Driver.driver_info_for_host(host) + if driver_info: + print( + "Driver:\n" + f" ROM: {driver_info.rom:04X}\n" + f" Firmware: {driver_info.fw_name}\n" + f" Config: {driver_info.config_name}\n" + ) + else: + print("Firmware already loaded or no supported driver for this device.") + + +# ----------------------------------------------------------------------------- +@click.group() +def main(): + logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) + + +@main.command +@click.argument("firmware_path") +def parse(firmware_path): + """Parse a firmware image.""" + do_parse(firmware_path) + + +@main.command +@click.argument("usb_transport") +@click.option( + "--force", + is_flag=True, + default=False, + help="Load even if the USB info doesn't match", +) +def load(usb_transport, force): + """Load a firmware image into the USB dongle.""" + asyncio.run(do_load(usb_transport, force)) + + +@main.command +@click.argument("usb_transport") +def drop(usb_transport): + """Drop a firmware image from the USB dongle.""" + asyncio.run(do_drop(usb_transport)) + + +@main.command +@click.argument("usb_transport") +@click.option( + "--force", + is_flag=True, + default=False, + help="Try to get the device info even if the USB info doesn't match", +) +def info(usb_transport, force): + """Get the firmware info from a USB dongle.""" + asyncio.run(do_info(usb_transport, force)) + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + main()