Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,59 @@ not perform any I/O, and can safely be called from the event loop.
* get_current_zone_temperature - Get the current temperature from (the first component in) a zone
* get_zone_override_mode - Get the override mode for the zone

### Connection state

Consumers can observe when the hub connects, disconnects, or reconnects. The
`connected` property reflects the current state; register a callback to be
notified on every transition. Callbacks MUST be safe to call from the event
loop; exceptions they raise are logged and swallowed.

# Called with (hub, True) on connect/reconnect, (hub, False) on disconnect
def on_connection_state(hub, connected):
print("connected" if connected else "disconnected")

hub.register_connection_callback(on_connection_state)
await hub.connect()
assert hub.connected
# ...later, to stop listening:
hub.deregister_connection_callback(on_connection_state)

### Reconnect behavior

If the connection is lost, pynobo reconnects automatically. Consumers observe
transitions through the connection callback above (`True → False → True`); there
is no need to call `connect()` or `start()` again.

**What triggers a reconnect:**

* TCP errors on the receive socket (e.g. `ECONNRESET` after ~24 h, network
interface going away).
* Silent network drops — if no frame has arrived from the hub within 2× the
keep-alive interval (~28 s by default), the connection is forced closed and
the reconnect path runs. This covers cases where outgoing packets are
dropped without any error surfacing (WiFi disabled, switch unplugged, hub
unreachable).

**Retry schedule:** exponential backoff from 10 s up to 60 s, retrying
indefinitely while the failure is transport-level. The hub will reconnect as
soon as the network returns, regardless of outage length.

**Terminal failures:** if the hub rejects the handshake (wrong serial,
unsupported API version) during reconnect, pynobo logs the error and stops
the background tasks. The connection state stays `False` (set when the drop
was first detected) and is not recovered automatically — the consumer must
fix the configuration and call `connect()` / `start()` again.

**Log signals worth watching:**

* `lost connection to hub (...)` — a TCP error occurred; reconnect is starting.
* `no response from hub in 28s, forcing reconnect` — the liveness check
tripped (silent drop detected).
* `reconnect attempt failed: ...; retrying in Ns` — an individual attempt
failed; backoff continues.
* `reconnected to Nobø Hub` — back online.
* `hub rejected handshake, giving up: ...` — terminal; intervention required.

## Exceptions

Errors raised by pynobo inherit from `PynoboError`:
Expand Down
161 changes: 117 additions & 44 deletions pynobo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import errno
import logging
import threading
import time
import warnings
import socket
from typing import Any, Callable, Union
Expand All @@ -24,13 +25,23 @@
errno.ETIMEDOUT, # Happens if hub has not responded to handshake in 60 seconds, e.g. due to network issue
]

# Backoff schedule for reconnect_hub: first attempt waits RECONNECT_INITIAL_DELAY,
# each subsequent attempt doubles the delay up to RECONNECT_MAX_DELAY.
RECONNECT_INITIAL_DELAY = 10
RECONNECT_MAX_DELAY = 60


class PynoboError(Exception):
"""Base class for all pynobo errors."""


class PynoboConnectionError(PynoboError):
"""Raised when the TCP connection to the hub fails or is lost."""
class PynoboConnectionError(PynoboError, OSError):
"""Raised when the TCP connection to the hub fails or is lost.

Also inherits OSError for back-compat: existing consumers that catch
OSError from asyncio.open_connection (e.g. Home Assistant's nobo_hub
rediscovery fallback) continue to work transparently.
"""


class PynoboHandshakeError(PynoboError):
Expand Down Expand Up @@ -399,10 +410,13 @@ def __init__(
self.timezone = timezone

self._callbacks: list[Callable[["nobo"], None]] = []
self._connection_callbacks: list[Callable[["nobo", bool], None]] = []
self._connected: bool = False
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
self._keep_alive_task: asyncio.Task[None] | None = None
self._socket_receive_task: asyncio.Task[None] | None = None
self._last_recv_at: float = 0.0

self._received_all_info = False
self.hub_info = {}
Expand Down Expand Up @@ -450,6 +464,31 @@ def deregister_callback(self, callback: Callable[["nobo"], None] = lambda *args,
"""
self._callbacks.remove(callback)

@property
def connected(self) -> bool:
"""Whether the hub is currently connected."""
return self._connected

def register_connection_callback(self, callback: Callable[["nobo", bool], None]) -> None:
"""Register a callback invoked on connection-state transitions."""
self._connection_callbacks.append(callback)

def deregister_connection_callback(self, callback: Callable[["nobo", bool], None]) -> None:
"""Deregister a previously registered connection-state callback."""
if callback in self._connection_callbacks:
self._connection_callbacks.remove(callback)

def _set_connected(self, value: bool) -> None:
"""Update connection state and fire callbacks on transitions."""
if self._connected == value:
return
self._connected = value
for cb in list(self._connection_callbacks):
try:
cb(self, value)
except Exception:
_LOGGER.exception("Connection-state callback raised")

async def connect(self) -> None:
"""Connect to Ecohub, either by scanning or directly."""
connected = False
Expand Down Expand Up @@ -501,7 +540,7 @@ async def stop(self) -> None:
if self._socket_receive_task:
self._socket_receive_task.cancel()
with suppress(asyncio.CancelledError):
await self._keep_alive_task
await self._socket_receive_task
await self.close()
_LOGGER.info('disconnected from Nobø Ecohub')

Expand All @@ -513,6 +552,7 @@ async def close(self) -> None:
await self._writer.wait_closed()
self._writer = None
_LOGGER.info('connection closed')
self._set_connected(False)

def connect_hub(self, ip: str, serial: str) -> bool:
warnings.warn(
Expand Down Expand Up @@ -580,6 +620,11 @@ async def async_connect_hub(self, ip: str, serial: str) -> bool:
await asyncio.wait_for(self._get_initial_data(), timeout=5)
except asyncio.TimeoutError as e:
raise PynoboConnectionError(f'Timed out waiting for initial data from {ip}') from e
# Fire connection callback before data callback so consumers
# that gate on `connected` see the transition before the data
# arrives and don't have to handle a "data while disconnected"
# window during reconnect.
self._set_connected(True)
for callback in self._callbacks:
callback(self)
return True
Expand All @@ -605,44 +650,46 @@ async def async_connect_hub(self, ip: str, serial: str) -> bool:
raise PynoboHandshakeError(f'connection to hub rejected: {response}')

async def reconnect_hub(self) -> None:
"""Attempt to reconnect to the hub."""
"""Keep trying to reconnect to the hub, with exponential backoff.

Retries indefinitely on transport-level failures (network down, hub
unreachable). Handshake-level rejection (PynoboHandshakeError) is not
caught here — it propagates out so an unrecoverable error isn't
silently retried forever.
"""
_LOGGER.info('reconnecting to hub')
# Pause keep alive during reconnect
self._keep_alive = False
# TODO: set timeout?
if self.discover:
# Reconnect using complete serial, but allow ip to change unless originally provided
discovered_hubs = await self.async_discover_hubs(ip=self.ip, serial=self.hub_serial, rediscover=True)
while discovered_hubs:
(discover_ip, discover_serial) = discovered_hubs.pop()
try:
connected = await self.async_connect_hub(discover_ip, discover_serial)
if connected:
break
except OSError as e:
# We know we should be able to connect, because we just discovered the IP address. However, if
# the connection was lost due to network problems on our host, we must wait until we have a local
# IP address. E.g. discovery may find Nobø Ecohub before DHCP address is assigned.
if e.errno in RECONNECT_ERRORS:
_LOGGER.warning("Failed to connect to ip %s: %s", discover_ip, e)
discovered_hubs.add( (discover_ip, discover_serial) )
await asyncio.sleep(1)
else:
raise PynoboConnectionError(f'Failed to reconnect to Nobø Ecohub at {discover_ip}: {e}') from e
else:
connected = False
while not connected:
_LOGGER.debug('Discovery disabled - waiting 10 seconds before trying to reconnect.')
await asyncio.sleep(10)
with suppress(asyncio.TimeoutError):
try:
connected = await self.async_connect_hub(self.ip, self.serial)
except OSError as e:
if e.errno in RECONNECT_ERRORS:
_LOGGER.debug('Ignoring %s', e)
else:
raise PynoboConnectionError(f'Failed to reconnect to Nobø Ecohub at {self.ip}: {e}') from e
delay = RECONNECT_INITIAL_DELAY
while True:
_LOGGER.debug('waiting %ds before next reconnect attempt', delay)
await asyncio.sleep(delay)
try:
if self.discover:
# Reconnect using complete serial, but allow ip to change unless originally provided
discovered_hubs = await self.async_discover_hubs(
ip=self.ip, serial=self.hub_serial, rediscover=True,
)
connected = False
while discovered_hubs and not connected:
(discover_ip, discover_serial) = discovered_hubs.pop()
try:
connected = await self.async_connect_hub(discover_ip, discover_serial)
except PynoboConnectionError as inner:
_LOGGER.warning("Failed to connect to %s: %s", discover_ip, inner)
else:
connected = await self.async_connect_hub(self.ip, self.serial)
except PynoboHandshakeError:
raise # unrecoverable — propagate so socket_receive's outer arm can stop() us
except PynoboConnectionError as e:
_LOGGER.info(
"reconnect attempt failed: %s; retrying in %ds",
e, min(delay * 2, RECONNECT_MAX_DELAY),
)
connected = False
if connected:
break
delay = min(delay * 2, RECONNECT_MAX_DELAY)

self._keep_alive = True
_LOGGER.info('reconnected to Nobø Hub')
Expand All @@ -651,7 +698,7 @@ async def reconnect_hub(self) -> None:
def discover_hubs(
serial: str = "",
ip: str | None = None,
autodiscover_wait: float = 3.0,
autodiscover_wait: float = 5.0,
loop: asyncio.AbstractEventLoop | None = None,
) -> set[tuple[str, str]]:
warnings.warn(
Expand All @@ -671,7 +718,7 @@ def discover_hubs(
async def async_discover_hubs(
serial: str = "",
ip: str | None = None,
autodiscover_wait: float = 3.0,
autodiscover_wait: float = 5.0,
loop: asyncio.AbstractEventLoop | None = None,
rediscover: bool = False,
) -> set[tuple[str, str]]:
Expand All @@ -694,7 +741,7 @@ async def async_discover_hubs(

:param serial: The last 3 digits of the Ecohub serial number or the complete 12 digit serial number
:param ip: ip address to search for Ecohub at (default None)
:param autodiscover_wait: how long to wait for an autodiscover package from the hub (default 3.0)
:param autodiscover_wait: how long to wait for an autodiscover package from the hub (default 5.0)
:param loop: deprecated
:param rediscover: if true, run until the hub is discovered

Expand Down Expand Up @@ -734,17 +781,29 @@ def _reuse_port() -> bool:
pass
return False

async def keep_alive(self, interval: int = 14) -> None:
async def keep_alive(self, interval: float = 14) -> None:
"""
Send a periodic handshake. Needs to be sent every < 30 sec, preferably every 14 seconds.

:param interval: seconds between each handshake. Default 14.
"""
self._keep_alive = True
self._last_recv_at = time.monotonic()
while True:
await asyncio.sleep(interval)
if self._keep_alive:
await self.async_send_command([nobo.API.HANDSHAKE])
if not self._keep_alive:
continue
# If nothing has come back from the hub within 2× the interval, the link is
# dead (e.g. silent network drop — WiFi off, hub unplugged). Close the writer
# to force readuntil in socket_receive() to EOF, routing into reconnect_hub().
# NOTE: this liveness check relies on the hub echoing HANDSHAKE at the app layer.
# If HANDSHAKE is ever replaced with the spec's KEEPALIVE, investigate how the
# message is acknowledged by the hub.
if time.monotonic() - self._last_recv_at > 2 * interval:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This general solution is prone to race condition, but is saved since asyncio is single threaded. Any reason not to go for 3× interval? Better example for others or if someone introduces multi threading in the future?

Thoughts for myself to remember risk in future:

If reconnect goes through (_last_recv_at = 0), then this loop runs before self._keep_alive gets set (_last_recv_at = 0+x), it waits for another 14 seconds, then sends a HANDSHAKE, waits for another 14 seconds before checking _last_recv_at = 0+x+2*14.
Depending on how bad your luck is (and a bit how fast your computer is). You may now close the connection without a actual timeout.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to bring back multithreading instead of async/await. As you state, it is much more prone to race conditions.

I have given the timeout interval quite some thought. On my local network, I always get a reply from HANDSHAKE within milliseconds. Even with a poor remote connection (e.g. mobile link to a cabin in the mountains) , you should expect a reply within about 10 seconds, worst case around 15 seconds.

Per spec, pynobo should send KEEPALIVE to the hub every 14 seconds. However, this message does not include a reply, which I guess is why HANDSHAKE was chosen instead (I believe this is from original implementation). The hub will disconnect if it does not see a message for 30 seconds - doesn't matter what message as far as I understand it.

On my local network, I could have had a timeout of 1 second, but it's unnecessary complicated to have dynamic timeout depending on connectivity. 2 x interval (=28s) is a reasonable compromise. 3 x interval (=42s) seems longer than necessary to me.

_LOGGER.info('no response from hub in %ss, forcing reconnect', 2 * interval)
await self.close()
continue
await self.async_send_command([nobo.API.HANDSHAKE])

def _create_task(self, target: Any) -> None:
try:
Expand Down Expand Up @@ -806,6 +865,7 @@ async def get_response(self) -> list[str]:
_LOGGER.info('lost connection to hub (%s)', e)
await self.close()
raise PynoboConnectionError(f'Lost connection to Nobø Ecohub: {e}') from e
self._last_recv_at = time.monotonic()
response = message.decode('utf-8').split(' ')
_LOGGER.debug('received: %s', response)
return response
Expand All @@ -824,18 +884,31 @@ async def socket_receive(self) -> None:
self.response_handler(response)
for callback in self._callbacks:
callback(self)
except (asyncio.IncompleteReadError) as e:
except asyncio.IncompleteReadError:
_LOGGER.info('connection to hub closed by peer; reconnecting')
self._set_connected(False)
await self.reconnect_hub()
except PynoboConnectionError as e:
# get_response() wraps ConnectionError from readuntil as PynoboConnectionError,
# so route it through the reconnect path here.
_LOGGER.info('Reconnecting due to %s', e)
self._set_connected(False)
await self.reconnect_hub()
except (OSError) as e:
if e.errno in RECONNECT_ERRORS:
_LOGGER.info('Reconnecting due to %s', e)
self._set_connected(False)
await self.reconnect_hub()
else:
# Caught by the outer `except Exception` below, so don't need to wrap.
raise e
except asyncio.CancelledError:
_LOGGER.debug('socket_receive stopped')
except PynoboHandshakeError as e:
# Unrecoverable — the hub rejected us (bad serial, version mismatch, etc.).
# Log cleanly instead of as an "Unhandled exception" traceback.
_LOGGER.error('hub rejected handshake, giving up: %s', e)
await self.stop()
except Exception as e:
# Ops, now we have real problems
_LOGGER.error('Unhandled exception %s', e, exc_info=1)
Expand Down
Loading
Loading