Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: '3.x'
- name: Install dependencies
Expand Down
17 changes: 15 additions & 2 deletions .github/workflows/python-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,23 @@ jobs:
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
fail-fast: false
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python-version }}
- name: Run tests
run: python -m unittest

mypy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install mypy
run: pip install mypy
- name: Run mypy --strict
run: mypy --strict pynobo/
143 changes: 104 additions & 39 deletions pynobo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import time
import warnings
import socket
from typing import Any, Callable, Union
from typing import Any, Callable, Final, Literal, TypedDict, Union, cast

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,6 +52,64 @@ class PynoboValidationError(PynoboError, ValueError, TypeError):
"""Raised for invalid parameters. Inherits ValueError and TypeError for back-compat."""


ModelType = Literal[
"THERMOSTAT_HEATER",
"THERMOSTAT_FLOOR",
"THERMOSTAT_ROOM",
"SWITCH",
"SWITCH_OUTLET",
"CONTROL_PANEL",
"UNKNOWN",
]


class HubInfo(TypedDict):
serial: str
name: str
default_away_override_length: str
override_id: str
software_version: str
hardware_version: str
production_date: str


class ZoneInfo(TypedDict):
zone_id: str
name: str
week_profile_id: str
temp_comfort_c: int | str
temp_eco_c: int | str
override_allowed: str
deprecated_override_id: str


class ComponentInfo(TypedDict):
serial: str
status: str
name: str
reverse_onoff: str
zone_id: str
override_id: str
tempsensor_for_zone_id: str
model: "nobo.Model"


class WeekProfileInfo(TypedDict):
week_profile_id: str
name: str
profile: list[str]


class OverrideInfo(TypedDict):
override_id: str
mode: str
type: str
end_time: str
start_time: str
target_type: str
target_id: str


class nobo:
"""This is where all the Nobø Hub magic happens!"""

Expand Down Expand Up @@ -192,7 +250,8 @@ class API:
DICT_NAME_TO_OVERRIDE_MODE = {NAME_NORMAL : OVERRIDE_MODE_NORMAL, NAME_COMFORT : OVERRIDE_MODE_COMFORT, NAME_ECO : OVERRIDE_MODE_ECO, NAME_AWAY : OVERRIDE_MODE_AWAY}
DICT_NAME_TO_WEEK_PROFILE_STATUS = {NAME_ECO : WEEK_PROFILE_STATE_ECO, NAME_COMFORT : WEEK_PROFILE_STATE_COMFORT, NAME_AWAY : WEEK_PROFILE_STATE_AWAY, NAME_OFF : WEEK_PROFILE_STATE_OFF}

def is_valid_datetime(timestamp: str):
@staticmethod
def is_valid_datetime(timestamp: str) -> bool:
if len(timestamp) != 12:
# Leading zero is optional for some of the fields below, but we require it.
return False
Expand All @@ -202,9 +261,11 @@ def is_valid_datetime(timestamp: str):
return False
return True

def time_is_quarter(minutes: str):
@staticmethod
def time_is_quarter(minutes: str) -> bool:
return int(minutes) % 15 == 0

@staticmethod
def validate_temperature(temperature: Union[int, str]) -> None:
if type(temperature) not in (int, str):
raise PynoboValidationError('Temperature must be integer or string')
Expand All @@ -216,6 +277,7 @@ def validate_temperature(temperature: Union[int, str]) -> None:
if temperature_int > 30:
raise PynoboValidationError(f'Max temperature is 30°C')

@staticmethod
def validate_week_profile(profile: list[str]) -> None:
if type(profile) != list:
raise PynoboValidationError("Week profile must be a list")
Expand Down Expand Up @@ -244,23 +306,23 @@ class Model:
https://help.nobo.no/en/user-manual/before-you-start/what-is-a-transmitter/list-of-transmitters/
"""

THERMOSTAT_HEATER = "THERMOSTAT_HEATER"
THERMOSTAT_FLOOR = "THERMOSTAT_FLOOR"
THERMOSTAT_ROOM = "THERMOSTAT_ROOM"
SWITCH = "SWITCH"
SWITCH_OUTLET = "SWITCH_OUTLET"
CONTROL_PANEL = "CONTROL_PANEL"
UNKNOWN = "UNKNOWN"
THERMOSTAT_HEATER: Final = "THERMOSTAT_HEATER"
THERMOSTAT_FLOOR: Final = "THERMOSTAT_FLOOR"
THERMOSTAT_ROOM: Final = "THERMOSTAT_ROOM"
SWITCH: Final = "SWITCH"
SWITCH_OUTLET: Final = "SWITCH_OUTLET"
CONTROL_PANEL: Final = "CONTROL_PANEL"
UNKNOWN: Final = "UNKNOWN"

def __init__(
self,
model_id: str,
type: Union[THERMOSTAT_HEATER, THERMOSTAT_FLOOR, THERMOSTAT_ROOM, SWITCH, SWITCH_OUTLET, CONTROL_PANEL, UNKNOWN],
type: ModelType,
name: str,
*,
supports_comfort: bool = False,
supports_eco: bool = False,
requires_control_panel = False,
requires_control_panel: bool = False,
has_temp_sensor: bool = False):
self._model_id = model_id
self._type = type
Expand All @@ -281,7 +343,7 @@ def name(self) -> str:
return self._name

@property
def type(self) -> Union[THERMOSTAT_HEATER, THERMOSTAT_FLOOR, THERMOSTAT_ROOM, SWITCH, SWITCH_OUTLET, CONTROL_PANEL, UNKNOWN]:
def type(self) -> ModelType:
"""Model type."""
return self._type

Expand Down Expand Up @@ -355,25 +417,24 @@ def datagram_received(self, data: bytes, addr: tuple[str | Any, ...]) -> None:
if msg.startswith('__NOBOHUB__'):
discover_serial = msg[11:]
discover_ip = addr[0]
matched_serial: str | None = None
if len(self.serial) == 12:
if discover_serial != self.serial[0:9]:
# This is not the Ecohub you are looking for
discover_serial = None
else:
discover_serial = self.serial
if discover_serial == self.serial[0:9]:
matched_serial = self.serial
else:
discover_serial += self.serial
matched_serial = discover_serial + self.serial
matched_ip: str | None = discover_ip
if self.ip and discover_ip != self.ip:
# This is not the Ecohub you are looking for
discover_ip = None
if discover_ip and discover_serial:
self.hubs.add( (discover_ip, discover_serial) )

hub_info: dict[str, Any]
zones: dict[str, dict[str, Any]]
components: dict[str, dict[str, Any]]
week_profiles: dict[str, dict[str, Any]]
overrides: dict[str, dict[str, Any]]
matched_ip = None
if matched_ip and matched_serial:
self.hubs.add((matched_ip, matched_serial))

hub_info: HubInfo
zones: dict[str, ZoneInfo]
components: dict[str, ComponentInfo]
week_profiles: dict[str, WeekProfileInfo]
overrides: dict[str, OverrideInfo]
temperatures: dict[str, str]

def __init__(
Expand Down Expand Up @@ -419,7 +480,7 @@ def __init__(
self._last_recv_at: float = 0.0

self._received_all_info = False
self.hub_info = {}
self.hub_info = cast(HubInfo, {})
self.zones = collections.OrderedDict()
self.components = collections.OrderedDict()
self.week_profiles = collections.OrderedDict()
Expand Down Expand Up @@ -678,7 +739,10 @@ async def reconnect_hub(self) -> None:
except PynoboConnectionError as inner:
_LOGGER.warning("Failed to connect to %s: %s", discover_ip, inner)
else:
connected = await self.async_connect_hub(self.ip, self.serial)
# self.ip can be None here; the underlying open_connection
# call surfaces it as PynoboConnectionError which the outer
# retry loop already handles.
connected = await self.async_connect_hub(self.ip, self.serial) # type: ignore[arg-type]
except PynoboHandshakeError:
raise # unrecoverable — propagate so socket_receive's outer arm can stop() us
except PynoboConnectionError as e:
Expand Down Expand Up @@ -858,6 +922,7 @@ async def get_response(self) -> list[str]:

:return: a single response as a list of strings where each string is a field
"""
assert self._reader is not None, "get_response called before connection established"
try:
message = await self._reader.readuntil(b'\r')
message = message[:-1]
Expand Down Expand Up @@ -911,7 +976,7 @@ async def socket_receive(self) -> None:
await self.stop()
except Exception as e:
# Ops, now we have real problems
_LOGGER.error('Unhandled exception %s', e, exc_info=1)
_LOGGER.error('Unhandled exception %s', e, exc_info=True)
# Just disconnect (instead of risking an infinite reconnect loop)
await self.stop()

Expand All @@ -925,16 +990,16 @@ def response_handler(self, response: list[str]) -> None:
# All info incoming, clear existing info
if response[0] == nobo.API.RESPONSE_SENDING_ALL_INFO:
self._received_all_info = False
self.hub_info = {}
self.hub_info = cast(HubInfo, {})
self.zones = {}
self.components = {}
self.week_profiles = {}
self.overrides = {}

# The added/updated info messages
elif response[0] in [nobo.API.RESPONSE_ZONE_INFO, nobo.API.RESPONSE_ADD_ZONE , nobo.API.RESPONSE_UPDATE_ZONE]:
dicti = collections.OrderedDict(zip(nobo.API.STRUCT_KEYS_ZONE, response[1:]))
self.zones[dicti['zone_id']] = dicti
dicti: collections.OrderedDict[str, Any] = collections.OrderedDict(zip(nobo.API.STRUCT_KEYS_ZONE, response[1:]))
self.zones[dicti['zone_id']] = cast(ZoneInfo, dicti)
_LOGGER.info('added/updated zone: %s', dicti['name'])

elif response[0] in [nobo.API.RESPONSE_COMPONENT_INFO, nobo.API.RESPONSE_ADD_COMPONENT , nobo.API.RESPONSE_UPDATE_COMPONENT]:
Expand All @@ -951,22 +1016,22 @@ def response_handler(self, response: list[str]) -> None:
nobo.Model.UNKNOWN,
f'Unknown (serial number: {serial[:3]} {serial[3:6]} {serial[6:9]} {serial[9:]})'
)
self.components[dicti['serial']] = dicti
self.components[dicti['serial']] = cast(ComponentInfo, dicti)
_LOGGER.info('added/updated component: %s', dicti['name'])

elif response[0] in [nobo.API.RESPONSE_WEEK_PROFILE_INFO, nobo.API.RESPONSE_ADD_WEEK_PROFILE, nobo.API.RESPONSE_UPDATE_WEEK_PROFILE]:
dicti = collections.OrderedDict(zip(nobo.API.STRUCT_KEYS_WEEK_PROFILE, response[1:]))
dicti['profile'] = response[-1].split(',')
self.week_profiles[dicti['week_profile_id']] = dicti
self.week_profiles[dicti['week_profile_id']] = cast(WeekProfileInfo, dicti)
_LOGGER.info('added/updated week profile: %s', dicti['name'])

elif response[0] in [nobo.API.RESPONSE_OVERRIDE_INFO, nobo.API.RESPONSE_ADD_OVERRIDE]:
dicti = collections.OrderedDict(zip(nobo.API.STRUCT_KEYS_OVERRIDE, response[1:]))
self.overrides[dicti['override_id']] = dicti
self.overrides[dicti['override_id']] = cast(OverrideInfo, dicti)
_LOGGER.info('added/updated override: id %s', dicti['override_id'])

elif response[0] in [nobo.API.RESPONSE_HUB_INFO, nobo.API.RESPONSE_UPDATE_HUB_INFO]:
self.hub_info = collections.OrderedDict(zip(nobo.API.STRUCT_KEYS_HUB, response[1:]))
self.hub_info = cast(HubInfo, collections.OrderedDict(zip(nobo.API.STRUCT_KEYS_HUB, response[1:])))
_LOGGER.info('updated hub info: %s', self.hub_info)
if response[0] == nobo.API.RESPONSE_HUB_INFO:
self._received_all_info = True
Expand Down Expand Up @@ -1111,7 +1176,7 @@ async def async_update_zone(
raise PynoboValidationError(f'Unknown zone id {zone_id}')

# Initialize command with the current zone settings
command = [nobo.API.UPDATE_ZONE] + list(self.zones[zone_id].values())
command: list[Any] = [nobo.API.UPDATE_ZONE] + list(self.zones[zone_id].values())

# Replace command with arguments that are not None.
if name:
Expand Down