Skip to content

Commit

Permalink
Reorganize exceptions
Browse files Browse the repository at this point in the history
* Add BaseBumbleException as a "real" root error
* Add several core error classes and properly replace builtin errors
  with them
* Add several error classes for specific modules (transport, device)
  • Loading branch information
zxzxwu committed Jun 4, 2024
1 parent 0903093 commit d4aefdd
Show file tree
Hide file tree
Showing 26 changed files with 260 additions and 159 deletions.
18 changes: 12 additions & 6 deletions bumble/at.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@

from typing import List, Union

from bumble import core


class AtParsingError(core.InvalidPacketError):
"""Error raised when parsing AT commands fails."""


def tokenize_parameters(buffer: bytes) -> List[bytes]:
"""Split input parameters into tokens.
Removes space characters outside of double quote blocks:
T-rec-V-25 - 5.2.1 Command line general format: "Space characters (IA5 2/0)
are ignored [..], unless they are embedded in numeric or string constants"
Raises ValueError in case of invalid input string."""
Raises AtParsingError in case of invalid input string."""

tokens = []
in_quotes = False
Expand All @@ -43,11 +49,11 @@ def tokenize_parameters(buffer: bytes) -> List[bytes]:
token = bytearray()
elif char == b'(':
if len(token) > 0:
raise ValueError("open_paren following regular character")
raise AtParsingError("open_paren following regular character")
tokens.append(char)
elif char == b'"':
if len(token) > 0:
raise ValueError("quote following regular character")
raise AtParsingError("quote following regular character")
in_quotes = True
token.extend(char)
else:
Expand All @@ -59,7 +65,7 @@ def tokenize_parameters(buffer: bytes) -> List[bytes]:

def parse_parameters(buffer: bytes) -> List[Union[bytes, list]]:
"""Parse the parameters using the comma and parenthesis separators.
Raises ValueError in case of invalid input string."""
Raises AtParsingError in case of invalid input string."""

tokens = tokenize_parameters(buffer)
accumulator: List[list] = [[]]
Expand All @@ -73,13 +79,13 @@ def parse_parameters(buffer: bytes) -> List[Union[bytes, list]]:
accumulator.append([])
elif token == b')':
if len(accumulator) < 2:
raise ValueError("close_paren without matching open_paren")
raise AtParsingError("close_paren without matching open_paren")
accumulator[-1].append(current)
current = accumulator.pop()
else:
current = token

accumulator[-1].append(current)
if len(accumulator) > 1:
raise ValueError("missing close_paren")
raise AtParsingError("missing close_paren")
return accumulator[0]
13 changes: 8 additions & 5 deletions bumble/avc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import struct
from typing import Dict, Type, Union, Tuple

from bumble import core
from bumble.utils import OpenIntEnum


Expand Down Expand Up @@ -88,7 +89,9 @@ def subclass(subclass):
short_name = subclass.__name__.replace("ResponseFrame", "")
category_class = ResponseFrame
else:
raise ValueError(f"invalid subclass name {subclass.__name__}")
raise core.InvalidArgumentError(
f"invalid subclass name {subclass.__name__}"
)

uppercase_indexes = [
i for i in range(len(short_name)) if short_name[i].isupper()
Expand All @@ -106,7 +109,7 @@ def subclass(subclass):
@staticmethod
def from_bytes(data: bytes) -> Frame:
if data[0] >> 4 != 0:
raise ValueError("first 4 bits must be 0s")
raise core.InvalidPacketError("first 4 bits must be 0s")

ctype_or_response = data[0] & 0xF
subunit_type = Frame.SubunitType(data[1] >> 3)
Expand All @@ -122,7 +125,7 @@ def from_bytes(data: bytes) -> Frame:
# Extended to the next byte
extension = data[2]
if extension == 0:
raise ValueError("extended subunit ID value reserved")
raise core.InvalidPacketError("extended subunit ID value reserved")
if extension == 0xFF:
subunit_id = 5 + 254 + data[3]
opcode_offset = 4
Expand All @@ -131,7 +134,7 @@ def from_bytes(data: bytes) -> Frame:
opcode_offset = 3

elif subunit_id == 6:
raise ValueError("reserved subunit ID")
raise core.InvalidPacketError("reserved subunit ID")

opcode = Frame.OperationCode(data[opcode_offset])
operands = data[opcode_offset + 1 :]
Expand Down Expand Up @@ -448,7 +451,7 @@ def __init__(
operation_data: bytes,
) -> None:
if len(operation_data) > 255:
raise ValueError("operation data must be <= 255 bytes")
raise core.InvalidArgumentError("operation data must be <= 255 bytes")
self.state_flag = state_flag
self.operation_id = operation_id
self.operation_data = operation_data
Expand Down
5 changes: 3 additions & 2 deletions bumble/avctp.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from bumble.colors import color
from bumble import avc
from bumble import core
from bumble import l2cap

# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -275,7 +276,7 @@ def unregister_command_handler(
self, pid: int, handler: Protocol.CommandHandler
) -> None:
if pid not in self.command_handlers or self.command_handlers[pid] != handler:
raise ValueError("command handler not registered")
raise core.InvalidArgumentError("command handler not registered")
del self.command_handlers[pid]

def register_response_handler(
Expand All @@ -287,5 +288,5 @@ def unregister_response_handler(
self, pid: int, handler: Protocol.ResponseHandler
) -> None:
if pid not in self.response_handlers or self.response_handlers[pid] != handler:
raise ValueError("response handler not registered")
raise core.InvalidArgumentError("response handler not registered")
del self.response_handlers[pid]
3 changes: 2 additions & 1 deletion bumble/avdtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE,
InvalidStateError,
ProtocolError,
InvalidArgumentError,
name_or_number,
)
from .a2dp import (
Expand Down Expand Up @@ -700,7 +701,7 @@ def subclass(subclass):
signal_identifier_str = name[:-7]
message_type = Message.MessageType.RESPONSE_REJECT
else:
raise ValueError('invalid class name')
raise InvalidArgumentError('invalid class name')

subclass.message_type = message_type

Expand Down
3 changes: 2 additions & 1 deletion bumble/avrcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
)
from bumble.utils import AsyncRunner, OpenIntEnum
from bumble.core import (
InvalidArgumentError,
ProtocolError,
BT_L2CAP_PROTOCOL_ID,
BT_AVCTP_PROTOCOL_ID,
Expand Down Expand Up @@ -1411,7 +1412,7 @@ def notify_playback_status_changed(self, status: PlayStatus) -> None:
def notify_track_changed(self, identifier: bytes) -> None:
"""Notify the connected peer of a Track change."""
if len(identifier) != 8:
raise ValueError("identifier must be 8 bytes")
raise InvalidArgumentError("identifier must be 8 bytes")
self.notify_event(TrackChangedEvent(identifier))

def notify_playback_position_changed(self, position: int) -> None:
Expand Down
30 changes: 17 additions & 13 deletions bumble/codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from __future__ import annotations
from dataclasses import dataclass

from bumble import core


# -----------------------------------------------------------------------------
class BitReader:
Expand All @@ -40,7 +42,7 @@ def read(self, bits: int) -> int:
""" "Read up to 32 bits."""

if bits > 32:
raise ValueError('maximum read size is 32')
raise core.InvalidArgumentError('maximum read size is 32')

if self.bits_cached >= bits:
# We have enough bits.
Expand All @@ -53,7 +55,7 @@ def read(self, bits: int) -> int:
feed_size = len(feed_bytes)
feed_int = int.from_bytes(feed_bytes, byteorder='big')
if 8 * feed_size + self.bits_cached < bits:
raise ValueError('trying to read past the data')
raise core.InvalidArgumentError('trying to read past the data')
self.byte_position += feed_size

# Combine the new cache and the old cache
Expand All @@ -68,7 +70,7 @@ def read(self, bits: int) -> int:

def read_bytes(self, count: int):
if self.bit_position + 8 * count > 8 * len(self.data):
raise ValueError('not enough data')
raise core.InvalidArgumentError('not enough data')

if self.bit_position % 8:
# Not byte aligned
Expand Down Expand Up @@ -113,7 +115,7 @@ def latm_value(reader: BitReader) -> int:

@staticmethod
def program_config_element(reader: BitReader):
raise ValueError('program_config_element not supported')
raise core.InvalidPacketError('program_config_element not supported')

@dataclass
class GASpecificConfig:
Expand All @@ -140,7 +142,7 @@ def __init__(
aac_spectral_data_resilience_flags = reader.read(1)
extension_flag_3 = reader.read(1)
if extension_flag_3 == 1:
raise ValueError('extensionFlag3 == 1 not supported')
raise core.InvalidPacketError('extensionFlag3 == 1 not supported')

@staticmethod
def audio_object_type(reader: BitReader):
Expand Down Expand Up @@ -216,7 +218,7 @@ def __init__(self, reader: BitReader) -> None:
reader, self.channel_configuration, self.audio_object_type
)
else:
raise ValueError(
raise core.InvalidPacketError(
f'audioObjectType {self.audio_object_type} not supported'
)

Expand Down Expand Up @@ -260,18 +262,18 @@ def __init__(self, reader: BitReader) -> None:
else:
audio_mux_version_a = 0
if audio_mux_version_a != 0:
raise ValueError('audioMuxVersionA != 0 not supported')
raise core.InvalidPacketError('audioMuxVersionA != 0 not supported')
if audio_mux_version == 1:
tara_buffer_fullness = AacAudioRtpPacket.latm_value(reader)
stream_cnt = 0
all_streams_same_time_framing = reader.read(1)
num_sub_frames = reader.read(6)
num_program = reader.read(4)
if num_program != 0:
raise ValueError('num_program != 0 not supported')
raise core.InvalidPacketError('num_program != 0 not supported')
num_layer = reader.read(3)
if num_layer != 0:
raise ValueError('num_layer != 0 not supported')
raise core.InvalidPacketError('num_layer != 0 not supported')
if audio_mux_version == 0:
self.audio_specific_config = AacAudioRtpPacket.AudioSpecificConfig(
reader
Expand All @@ -284,7 +286,7 @@ def __init__(self, reader: BitReader) -> None:
)
audio_specific_config_len = reader.bit_position - marker
if asc_len < audio_specific_config_len:
raise ValueError('audio_specific_config_len > asc_len')
raise core.InvalidPacketError('audio_specific_config_len > asc_len')
asc_len -= audio_specific_config_len
reader.skip(asc_len)
frame_length_type = reader.read(3)
Expand All @@ -293,7 +295,9 @@ def __init__(self, reader: BitReader) -> None:
elif frame_length_type == 1:
frame_length = reader.read(9)
else:
raise ValueError(f'frame_length_type {frame_length_type} not supported')
raise core.InvalidPacketError(
f'frame_length_type {frame_length_type} not supported'
)

self.other_data_present = reader.read(1)
if self.other_data_present:
Expand All @@ -318,12 +322,12 @@ class AudioMuxElement:

def __init__(self, reader: BitReader, mux_config_present: int):
if mux_config_present == 0:
raise ValueError('muxConfigPresent == 0 not supported')
raise core.InvalidPacketError('muxConfigPresent == 0 not supported')

# AudioMuxElement - ISO/EIC 14496-3 Table 1.41
use_same_stream_mux = reader.read(1)
if use_same_stream_mux:
raise ValueError('useSameStreamMux == 1 not supported')
raise core.InvalidPacketError('useSameStreamMux == 1 not supported')
self.stream_mux_config = AacAudioRtpPacket.StreamMuxConfig(reader)

# We only support:
Expand Down
8 changes: 6 additions & 2 deletions bumble/colors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
from typing import List, Optional, Union


class ColorError(ValueError):
"""Error raised when a color spec is invalid."""


# ANSI color names. There is also a "default"
COLORS = ('black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white')

Expand Down Expand Up @@ -52,7 +56,7 @@ def _color_code(spec: ColorSpec, base: int) -> str:
elif isinstance(spec, int) and 0 <= spec <= 255:
return _join(base + 8, 5, spec)
else:
raise ValueError('Invalid color spec "%s"' % spec)
raise ColorError('Invalid color spec "%s"' % spec)


def color(
Expand All @@ -72,7 +76,7 @@ def color(
if style_part in STYLES:
codes.append(STYLES.index(style_part))
else:
raise ValueError('Invalid style "%s"' % style_part)
raise ColorError('Invalid style "%s"' % style_part)

if codes:
return '\x1b[{0}m{1}\x1b[0m'.format(_join(*codes), s)
Expand Down
Loading

0 comments on commit d4aefdd

Please sign in to comment.