Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganize exceptions #501

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions bumble/at.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@

from typing import List, Union

from bumble import core


class AtParsingError(core.InvalidPacketError):
"""Error raised when parsing AT commands fails."""


def tokenize_parameters(buffer: bytes) -> List[bytes]:
"""Split input parameters into tokens.
Removes space characters outside of double quote blocks:
T-rec-V-25 - 5.2.1 Command line general format: "Space characters (IA5 2/0)
are ignored [..], unless they are embedded in numeric or string constants"
Raises ValueError in case of invalid input string."""
Raises AtParsingError in case of invalid input string."""

tokens = []
in_quotes = False
Expand All @@ -43,11 +49,11 @@ def tokenize_parameters(buffer: bytes) -> List[bytes]:
token = bytearray()
elif char == b'(':
if len(token) > 0:
raise ValueError("open_paren following regular character")
raise AtParsingError("open_paren following regular character")
tokens.append(char)
elif char == b'"':
if len(token) > 0:
raise ValueError("quote following regular character")
raise AtParsingError("quote following regular character")
in_quotes = True
token.extend(char)
else:
Expand All @@ -59,7 +65,7 @@ def tokenize_parameters(buffer: bytes) -> List[bytes]:

def parse_parameters(buffer: bytes) -> List[Union[bytes, list]]:
"""Parse the parameters using the comma and parenthesis separators.
Raises ValueError in case of invalid input string."""
Raises AtParsingError in case of invalid input string."""

tokens = tokenize_parameters(buffer)
accumulator: List[list] = [[]]
Expand All @@ -73,13 +79,13 @@ def parse_parameters(buffer: bytes) -> List[Union[bytes, list]]:
accumulator.append([])
elif token == b')':
if len(accumulator) < 2:
raise ValueError("close_paren without matching open_paren")
raise AtParsingError("close_paren without matching open_paren")
accumulator[-1].append(current)
current = accumulator.pop()
else:
current = token

accumulator[-1].append(current)
if len(accumulator) > 1:
raise ValueError("missing close_paren")
raise AtParsingError("missing close_paren")
return accumulator[0]
13 changes: 8 additions & 5 deletions bumble/avc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import struct
from typing import Dict, Type, Union, Tuple

from bumble import core
from bumble.utils import OpenIntEnum


Expand Down Expand Up @@ -88,7 +89,9 @@ def subclass(subclass):
short_name = subclass.__name__.replace("ResponseFrame", "")
category_class = ResponseFrame
else:
raise ValueError(f"invalid subclass name {subclass.__name__}")
raise core.InvalidArgumentError(
f"invalid subclass name {subclass.__name__}"
)

uppercase_indexes = [
i for i in range(len(short_name)) if short_name[i].isupper()
Expand All @@ -106,7 +109,7 @@ def subclass(subclass):
@staticmethod
def from_bytes(data: bytes) -> Frame:
if data[0] >> 4 != 0:
raise ValueError("first 4 bits must be 0s")
raise core.InvalidPacketError("first 4 bits must be 0s")

ctype_or_response = data[0] & 0xF
subunit_type = Frame.SubunitType(data[1] >> 3)
Expand All @@ -122,7 +125,7 @@ def from_bytes(data: bytes) -> Frame:
# Extended to the next byte
extension = data[2]
if extension == 0:
raise ValueError("extended subunit ID value reserved")
raise core.InvalidPacketError("extended subunit ID value reserved")
if extension == 0xFF:
subunit_id = 5 + 254 + data[3]
opcode_offset = 4
Expand All @@ -131,7 +134,7 @@ def from_bytes(data: bytes) -> Frame:
opcode_offset = 3

elif subunit_id == 6:
raise ValueError("reserved subunit ID")
raise core.InvalidPacketError("reserved subunit ID")

opcode = Frame.OperationCode(data[opcode_offset])
operands = data[opcode_offset + 1 :]
Expand Down Expand Up @@ -448,7 +451,7 @@ def __init__(
operation_data: bytes,
) -> None:
if len(operation_data) > 255:
raise ValueError("operation data must be <= 255 bytes")
raise core.InvalidArgumentError("operation data must be <= 255 bytes")
self.state_flag = state_flag
self.operation_id = operation_id
self.operation_data = operation_data
Expand Down
5 changes: 3 additions & 2 deletions bumble/avctp.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from bumble.colors import color
from bumble import avc
from bumble import core
from bumble import l2cap

# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -275,7 +276,7 @@ def unregister_command_handler(
self, pid: int, handler: Protocol.CommandHandler
) -> None:
if pid not in self.command_handlers or self.command_handlers[pid] != handler:
raise ValueError("command handler not registered")
raise core.InvalidArgumentError("command handler not registered")
del self.command_handlers[pid]

def register_response_handler(
Expand All @@ -287,5 +288,5 @@ def unregister_response_handler(
self, pid: int, handler: Protocol.ResponseHandler
) -> None:
if pid not in self.response_handlers or self.response_handlers[pid] != handler:
raise ValueError("response handler not registered")
raise core.InvalidArgumentError("response handler not registered")
del self.response_handlers[pid]
3 changes: 2 additions & 1 deletion bumble/avdtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE,
InvalidStateError,
ProtocolError,
InvalidArgumentError,
name_or_number,
)
from .a2dp import (
Expand Down Expand Up @@ -700,7 +701,7 @@ def subclass(subclass):
signal_identifier_str = name[:-7]
message_type = Message.MessageType.RESPONSE_REJECT
else:
raise ValueError('invalid class name')
raise InvalidArgumentError('invalid class name')

subclass.message_type = message_type

Expand Down
3 changes: 2 additions & 1 deletion bumble/avrcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
)
from bumble.utils import AsyncRunner, OpenIntEnum
from bumble.core import (
InvalidArgumentError,
ProtocolError,
BT_L2CAP_PROTOCOL_ID,
BT_AVCTP_PROTOCOL_ID,
Expand Down Expand Up @@ -1411,7 +1412,7 @@ def notify_playback_status_changed(self, status: PlayStatus) -> None:
def notify_track_changed(self, identifier: bytes) -> None:
"""Notify the connected peer of a Track change."""
if len(identifier) != 8:
raise ValueError("identifier must be 8 bytes")
raise InvalidArgumentError("identifier must be 8 bytes")
self.notify_event(TrackChangedEvent(identifier))

def notify_playback_position_changed(self, position: int) -> None:
Expand Down
30 changes: 17 additions & 13 deletions bumble/codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from __future__ import annotations
from dataclasses import dataclass

from bumble import core


# -----------------------------------------------------------------------------
class BitReader:
Expand All @@ -40,7 +42,7 @@ def read(self, bits: int) -> int:
""" "Read up to 32 bits."""

if bits > 32:
raise ValueError('maximum read size is 32')
raise core.InvalidArgumentError('maximum read size is 32')

if self.bits_cached >= bits:
# We have enough bits.
Expand All @@ -53,7 +55,7 @@ def read(self, bits: int) -> int:
feed_size = len(feed_bytes)
feed_int = int.from_bytes(feed_bytes, byteorder='big')
if 8 * feed_size + self.bits_cached < bits:
raise ValueError('trying to read past the data')
raise core.InvalidArgumentError('trying to read past the data')
self.byte_position += feed_size

# Combine the new cache and the old cache
Expand All @@ -68,7 +70,7 @@ def read(self, bits: int) -> int:

def read_bytes(self, count: int):
if self.bit_position + 8 * count > 8 * len(self.data):
raise ValueError('not enough data')
raise core.InvalidArgumentError('not enough data')

if self.bit_position % 8:
# Not byte aligned
Expand Down Expand Up @@ -113,7 +115,7 @@ def latm_value(reader: BitReader) -> int:

@staticmethod
def program_config_element(reader: BitReader):
raise ValueError('program_config_element not supported')
raise core.InvalidPacketError('program_config_element not supported')

@dataclass
class GASpecificConfig:
Expand All @@ -140,7 +142,7 @@ def __init__(
aac_spectral_data_resilience_flags = reader.read(1)
extension_flag_3 = reader.read(1)
if extension_flag_3 == 1:
raise ValueError('extensionFlag3 == 1 not supported')
raise core.InvalidPacketError('extensionFlag3 == 1 not supported')

@staticmethod
def audio_object_type(reader: BitReader):
Expand Down Expand Up @@ -216,7 +218,7 @@ def __init__(self, reader: BitReader) -> None:
reader, self.channel_configuration, self.audio_object_type
)
else:
raise ValueError(
raise core.InvalidPacketError(
f'audioObjectType {self.audio_object_type} not supported'
)

Expand Down Expand Up @@ -260,18 +262,18 @@ def __init__(self, reader: BitReader) -> None:
else:
audio_mux_version_a = 0
if audio_mux_version_a != 0:
raise ValueError('audioMuxVersionA != 0 not supported')
raise core.InvalidPacketError('audioMuxVersionA != 0 not supported')
if audio_mux_version == 1:
tara_buffer_fullness = AacAudioRtpPacket.latm_value(reader)
stream_cnt = 0
all_streams_same_time_framing = reader.read(1)
num_sub_frames = reader.read(6)
num_program = reader.read(4)
if num_program != 0:
raise ValueError('num_program != 0 not supported')
raise core.InvalidPacketError('num_program != 0 not supported')
num_layer = reader.read(3)
if num_layer != 0:
raise ValueError('num_layer != 0 not supported')
raise core.InvalidPacketError('num_layer != 0 not supported')
if audio_mux_version == 0:
self.audio_specific_config = AacAudioRtpPacket.AudioSpecificConfig(
reader
Expand All @@ -284,7 +286,7 @@ def __init__(self, reader: BitReader) -> None:
)
audio_specific_config_len = reader.bit_position - marker
if asc_len < audio_specific_config_len:
raise ValueError('audio_specific_config_len > asc_len')
raise core.InvalidPacketError('audio_specific_config_len > asc_len')
asc_len -= audio_specific_config_len
reader.skip(asc_len)
frame_length_type = reader.read(3)
Expand All @@ -293,7 +295,9 @@ def __init__(self, reader: BitReader) -> None:
elif frame_length_type == 1:
frame_length = reader.read(9)
else:
raise ValueError(f'frame_length_type {frame_length_type} not supported')
raise core.InvalidPacketError(
f'frame_length_type {frame_length_type} not supported'
)

self.other_data_present = reader.read(1)
if self.other_data_present:
Expand All @@ -318,12 +322,12 @@ class AudioMuxElement:

def __init__(self, reader: BitReader, mux_config_present: int):
if mux_config_present == 0:
raise ValueError('muxConfigPresent == 0 not supported')
raise core.InvalidPacketError('muxConfigPresent == 0 not supported')

# AudioMuxElement - ISO/EIC 14496-3 Table 1.41
use_same_stream_mux = reader.read(1)
if use_same_stream_mux:
raise ValueError('useSameStreamMux == 1 not supported')
raise core.InvalidPacketError('useSameStreamMux == 1 not supported')
self.stream_mux_config = AacAudioRtpPacket.StreamMuxConfig(reader)

# We only support:
Expand Down
8 changes: 6 additions & 2 deletions bumble/colors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
from typing import List, Optional, Union


class ColorError(ValueError):
"""Error raised when a color spec is invalid."""


# ANSI color names. There is also a "default"
COLORS = ('black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white')

Expand Down Expand Up @@ -52,7 +56,7 @@ def _color_code(spec: ColorSpec, base: int) -> str:
elif isinstance(spec, int) and 0 <= spec <= 255:
return _join(base + 8, 5, spec)
else:
raise ValueError('Invalid color spec "%s"' % spec)
raise ColorError('Invalid color spec "%s"' % spec)


def color(
Expand All @@ -72,7 +76,7 @@ def color(
if style_part in STYLES:
codes.append(STYLES.index(style_part))
else:
raise ValueError('Invalid style "%s"' % style_part)
raise ColorError('Invalid style "%s"' % style_part)

if codes:
return '\x1b[{0}m{1}\x1b[0m'.format(_join(*codes), s)
Expand Down
Loading
Loading