Skip to content

Commit

Permalink
aics: address first batch of review
Browse files Browse the repository at this point in the history
  • Loading branch information
SilverBzH committed Sep 9, 2024
1 parent 8462252 commit a7748dc
Showing 1 changed file with 98 additions and 99 deletions.
197 changes: 98 additions & 99 deletions bumble/profiles/aics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Le Audio - Audio Input Control Service"""
"""LE Audio - Audio Input Control Service"""

# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import enum
import logging

from typing import Union, Optional
Expand Down Expand Up @@ -47,6 +46,8 @@

from bumble.gatt_client import ProfileServiceProxy, ServiceProxy

from utils import OpenIntEnum

# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
Expand All @@ -61,7 +62,7 @@
GAIN_SETTINGS_MAX_VALUE = 255


class ErrorCode(enum.IntEnum):
class ErrorCode(OpenIntEnum):
'''
Cf. 1.6 Application error codes
'''
Expand All @@ -73,7 +74,7 @@ class ErrorCode(enum.IntEnum):
GAIN_MODE_CHANGE_NOT_ALLOWED = 0x84


class Mute(enum.IntEnum):
class Mute(OpenIntEnum):
'''
Cf. 2.2.1.2 Mute Field
'''
Expand All @@ -83,7 +84,7 @@ class Mute(enum.IntEnum):
DISABLED = 0x02


class GainMode(enum.IntEnum):
class GainMode(OpenIntEnum):
'''
Cf. 2.2.1.3 Gain Mode
'''
Expand All @@ -94,7 +95,7 @@ class GainMode(enum.IntEnum):
AUTOMATIC = 0x03


class AudioInputStatus(enum.IntEnum):
class AudioInputStatus(OpenIntEnum):
'''
Cf. 3.4 Audio Input Status
'''
Expand All @@ -103,7 +104,7 @@ class AudioInputStatus(enum.IntEnum):
ACTIVE = 0x01


class AudioInputControlPointOpCode(enum.IntEnum):
class AudioInputControlPointOpCode(OpenIntEnum):
'''
Cf. 3.5.1 Audio Input Control Point procedure requirements
'''
Expand Down Expand Up @@ -159,17 +160,15 @@ def decrement_gain_settings(self) -> None:
self.increment_change_counter()

def increment_change_counter(self):
self.change_counter += 0x01
if self.change_counter > CHANGE_COUNTER_MAX_VALUE:
self.change_counter = 0x00
self.change_counter = (self.change_counter + 1) % (CHANGE_COUNTER_MAX_VALUE + 1)

async def notify_subscribers(self, connection: Connection) -> None:
await connection.device.notify_subscribers(
attribute=self.value, value=self.__bytes__()
attribute=self.value, value=bytes(self)
)

def _on_read(self, _connection: Optional[Connection]) -> bytes:
return self.__bytes__()
return bytes(self)


class GainSettingsProperties(Characteristic):
Expand Down Expand Up @@ -202,7 +201,7 @@ def __bytes__(self) -> bytes:
)

def _on_read(self, _connection: Optional[Connection]) -> bytes:
return self.__bytes__()
return bytes(self)


class AudioInputDescription(Characteristic):
Expand Down Expand Up @@ -261,115 +260,115 @@ def __init__(
async def _on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection

async def _set_gain_settings(gain_settings_operand: int) -> None:
'''Cf. 3.5.2.1 Set Gain Settings Procedure'''

gain_mode = self.audio_input_state.gain_mode

if not (gain_mode == GainMode.MANUAL or gain_mode == GainMode.MANUAL_ONLY):
logger.warning(
"GainMode should be either MANUAL or MANUAL_ONLY Cf Spec Audio Input Control Service 3.5.2.1"
)
return

if (
gain_settings_operand
< self.gain_settings_properties.gain_settings_minimum
or gain_settings_operand
> self.gain_settings_properties.gain_settings_maximum
):
logger.error("gain_seetings value out of range")
raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)
try:
opcode = AudioInputControlPointOpCode(value[0])

if self.audio_input_state.gain_settings != gain_settings_operand:
self.audio_input_state.gain_settings = gain_settings_operand
await self.audio_input_state.notify_subscribers(connection)
if opcode == AudioInputControlPointOpCode.SET_GAIN_SETTING:
gain_settings_operand = value[2]
await self._set_gain_settings(connection, gain_settings_operand)
elif opcode == AudioInputControlPointOpCode.UNMUTE:
await self._unmute(connection)
elif opcode == AudioInputControlPointOpCode.MUTE:
change_counter_operand = value[1]
await self._mute(connection, change_counter_operand)
elif opcode == AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE:
await self._set_manual_gain_mode(connection)
elif opcode == AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE:
await self._set_automatic_gain_mode(connection)

async def _unmute():
'''Cf. 3.5.2.2 Unmute procedure'''
except ValueError as e:
logger.error(f"OpCode value is incorrect: {e}")
raise ATT_Error(ErrorCode.OPCODE_NOT_SUPPORTED)

logger.error(f'unmute: {self.audio_input_state.mute}')
mute = self.audio_input_state.mute
if mute == Mute.DISABLED:
logger.error("unmute: Cannot change Mute value, Mute state is DISABLED")
raise ATT_Error(ErrorCode.MUTE_DISABLED)
async def _set_gain_settings(
self, connection: Connection, gain_settings_operand: int
) -> None:
'''Cf. 3.5.2.1 Set Gain Settings Procedure'''

if mute == Mute.NOT_MUTED:
return
gain_mode = self.audio_input_state.gain_mode

self.audio_input_state.mute = Mute.NOT_MUTED
self.audio_input_state.increment_change_counter()
if not (gain_mode == GainMode.MANUAL or gain_mode == GainMode.MANUAL_ONLY):
logger.warning(
"GainMode should be either MANUAL or MANUAL_ONLY Cf Spec Audio Input Control Service 3.5.2.1"
)
return

if (
gain_settings_operand < self.gain_settings_properties.gain_settings_minimum
or gain_settings_operand
> self.gain_settings_properties.gain_settings_maximum
):
logger.error("gain_seetings value out of range")
raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)

if self.audio_input_state.gain_settings != gain_settings_operand:
self.audio_input_state.gain_settings = gain_settings_operand
await self.audio_input_state.notify_subscribers(connection)

async def _mute() -> None:
'''Cf. 3.5.5.2 Mute procedure'''
async def _unmute(self, connection: Connection):
'''Cf. 3.5.2.2 Unmute procedure'''

change_counter = self.audio_input_state.change_counter
mute = self.audio_input_state.mute
if mute == Mute.DISABLED:
logger.error("mute: Cannot change Mute value, Mute state is DISABLED")
raise ATT_Error(ErrorCode.MUTE_DISABLED)
logger.error(f'unmute: {self.audio_input_state.mute}')
mute = self.audio_input_state.mute
if mute == Mute.DISABLED:
logger.error("unmute: Cannot change Mute value, Mute state is DISABLED")
raise ATT_Error(ErrorCode.MUTE_DISABLED)

change_counter_operand = value[1]
if change_counter != change_counter_operand:
raise ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER)
if mute == Mute.NOT_MUTED:
return

if mute == Mute.MUTED:
return
self.audio_input_state.mute = Mute.NOT_MUTED
self.audio_input_state.increment_change_counter()
await self.audio_input_state.notify_subscribers(connection)

self.audio_input_state.mute = Mute.MUTED
self.audio_input_state.increment_change_counter()
await self.audio_input_state.notify_subscribers(connection)
async def _mute(self, connection: Connection, change_counter_operand: int) -> None:
'''Cf. 3.5.5.2 Mute procedure'''

async def _set_manual_gain_mode() -> None:
'''Cf. 3.5.2.4 Set Manual Gain Mode procedure'''
change_counter = self.audio_input_state.change_counter
mute = self.audio_input_state.mute
if mute == Mute.DISABLED:
logger.error("mute: Cannot change Mute value, Mute state is DISABLED")
raise ATT_Error(ErrorCode.MUTE_DISABLED)

gain_mode = self.audio_input_state.gain_mode
if gain_mode in (GainMode.AUTOMATIC_ONLY, GainMode.MANUAL_ONLY):
logger.error(f"Cannot change gain_mode, bad state: {gain_mode}")
raise ATT_Error(ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED)
if change_counter != change_counter_operand:
raise ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER)

if gain_mode == GainMode.MANUAL:
return
if mute == Mute.MUTED:
return

self.audio_input_state.gain_mode = GainMode.MANUAL
self.audio_input_state.increment_change_counter()
await self.audio_input_state.notify_subscribers(connection)
self.audio_input_state.mute = Mute.MUTED
self.audio_input_state.increment_change_counter()
await self.audio_input_state.notify_subscribers(connection)

async def _set_automatic_gain_mode() -> None:
'''Cf. 3.5.2.5 Set Automatic Gain Mode'''
async def _set_manual_gain_mode(self, connection: Connection) -> None:
'''Cf. 3.5.2.4 Set Manual Gain Mode procedure'''

gain_mode = self.audio_input_state.gain_mode
if gain_mode in (GainMode.AUTOMATIC_ONLY, GainMode.MANUAL_ONLY):
logger.error(f"Cannot change gain_mode, bad state: {gain_mode}")
raise ATT_Error(ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED)
gain_mode = self.audio_input_state.gain_mode
if gain_mode in (GainMode.AUTOMATIC_ONLY, GainMode.MANUAL_ONLY):
logger.error(f"Cannot change gain_mode, bad state: {gain_mode}")
raise ATT_Error(ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED)

if gain_mode == GainMode.AUTOMATIC:
return
if gain_mode == GainMode.MANUAL:
return

self.audio_input_state.gain_mode = GainMode.AUTOMATIC
self.audio_input_state.increment_change_counter()
await self.audio_input_state.notify_subscribers(connection)
self.audio_input_state.gain_mode = GainMode.MANUAL
self.audio_input_state.increment_change_counter()
await self.audio_input_state.notify_subscribers(connection)

try:
opcode = AudioInputControlPointOpCode(value[0])
async def _set_automatic_gain_mode(self, connection: Connection) -> None:
'''Cf. 3.5.2.5 Set Automatic Gain Mode'''

opcode = AudioInputControlPointOpCode(value[0])
gain_mode = self.audio_input_state.gain_mode
if gain_mode in (GainMode.AUTOMATIC_ONLY, GainMode.MANUAL_ONLY):
logger.error(f"Cannot change gain_mode, bad state: {gain_mode}")
raise ATT_Error(ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED)

if opcode == AudioInputControlPointOpCode.SET_GAIN_SETTING:
await _set_gain_settings(value[2])
elif opcode == AudioInputControlPointOpCode.UNMUTE:
await _unmute()
elif opcode == AudioInputControlPointOpCode.MUTE:
await _mute()
elif opcode == AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE:
await _set_manual_gain_mode()
elif opcode == AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE:
await _set_automatic_gain_mode()
if gain_mode == GainMode.AUTOMATIC:
return

except ValueError as e:
logger.error(f"OpCode value is incorrect: {e}")
raise ATT_Error(ErrorCode.OPCODE_NOT_SUPPORTED)
self.audio_input_state.gain_mode = GainMode.AUTOMATIC
self.audio_input_state.increment_change_counter()
await self.audio_input_state.notify_subscribers(connection)


class AICSService(TemplateService):
Expand Down

0 comments on commit a7748dc

Please sign in to comment.