diff --git a/README.md b/README.md index 40cb9a3..21468d9 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,59 @@ not perform any I/O, and can safely be called from the event loop. * get_current_zone_temperature - Get the current temperature from (the first component in) a zone * get_zone_override_mode - Get the override mode for the zone +### Connection state + +Consumers can observe when the hub connects, disconnects, or reconnects. The +`connected` property reflects the current state; register a callback to be +notified on every transition. Callbacks MUST be safe to call from the event +loop; exceptions they raise are logged and swallowed. + + # Called with (hub, True) on connect/reconnect, (hub, False) on disconnect + def on_connection_state(hub, connected): + print("connected" if connected else "disconnected") + + hub.register_connection_callback(on_connection_state) + await hub.connect() + assert hub.connected + # ...later, to stop listening: + hub.deregister_connection_callback(on_connection_state) + +### Reconnect behavior + +If the connection is lost, pynobo reconnects automatically. Consumers observe +transitions through the connection callback above (`True → False → True`); there +is no need to call `connect()` or `start()` again. + +**What triggers a reconnect:** + +* TCP errors on the receive socket (e.g. `ECONNRESET` after ~24 h, network + interface going away). +* Silent network drops — if no frame has arrived from the hub within 2× the + keep-alive interval (~28 s by default), the connection is forced closed and + the reconnect path runs. This covers cases where outgoing packets are + dropped without any error surfacing (WiFi disabled, switch unplugged, hub + unreachable). + +**Retry schedule:** exponential backoff from 10 s up to 60 s, retrying +indefinitely while the failure is transport-level. The hub will reconnect as +soon as the network returns, regardless of outage length. + +**Terminal failures:** if the hub rejects the handshake (wrong serial, +unsupported API version) during reconnect, pynobo logs the error and stops +the background tasks. The connection state stays `False` (set when the drop +was first detected) and is not recovered automatically — the consumer must +fix the configuration and call `connect()` / `start()` again. + +**Log signals worth watching:** + +* `lost connection to hub (...)` — a TCP error occurred; reconnect is starting. +* `no response from hub in 28s, forcing reconnect` — the liveness check + tripped (silent drop detected). +* `reconnect attempt failed: ...; retrying in Ns` — an individual attempt + failed; backoff continues. +* `reconnected to Nobø Hub` — back online. +* `hub rejected handshake, giving up: ...` — terminal; intervention required. + ## Exceptions Errors raised by pynobo inherit from `PynoboError`: diff --git a/pynobo/__init__.py b/pynobo/__init__.py index 9562d9e..ab36e83 100644 --- a/pynobo/__init__.py +++ b/pynobo/__init__.py @@ -7,6 +7,7 @@ import errno import logging import threading +import time import warnings import socket from typing import Any, Callable, Union @@ -24,13 +25,23 @@ errno.ETIMEDOUT, # Happens if hub has not responded to handshake in 60 seconds, e.g. due to network issue ] +# Backoff schedule for reconnect_hub: first attempt waits RECONNECT_INITIAL_DELAY, +# each subsequent attempt doubles the delay up to RECONNECT_MAX_DELAY. +RECONNECT_INITIAL_DELAY = 10 +RECONNECT_MAX_DELAY = 60 + class PynoboError(Exception): """Base class for all pynobo errors.""" -class PynoboConnectionError(PynoboError): - """Raised when the TCP connection to the hub fails or is lost.""" +class PynoboConnectionError(PynoboError, OSError): + """Raised when the TCP connection to the hub fails or is lost. + + Also inherits OSError for back-compat: existing consumers that catch + OSError from asyncio.open_connection (e.g. Home Assistant's nobo_hub + rediscovery fallback) continue to work transparently. + """ class PynoboHandshakeError(PynoboError): @@ -399,10 +410,13 @@ def __init__( self.timezone = timezone self._callbacks: list[Callable[["nobo"], None]] = [] + self._connection_callbacks: list[Callable[["nobo", bool], None]] = [] + self._connected: bool = False self._reader: asyncio.StreamReader | None = None self._writer: asyncio.StreamWriter | None = None self._keep_alive_task: asyncio.Task[None] | None = None self._socket_receive_task: asyncio.Task[None] | None = None + self._last_recv_at: float = 0.0 self._received_all_info = False self.hub_info = {} @@ -450,6 +464,31 @@ def deregister_callback(self, callback: Callable[["nobo"], None] = lambda *args, """ self._callbacks.remove(callback) + @property + def connected(self) -> bool: + """Whether the hub is currently connected.""" + return self._connected + + def register_connection_callback(self, callback: Callable[["nobo", bool], None]) -> None: + """Register a callback invoked on connection-state transitions.""" + self._connection_callbacks.append(callback) + + def deregister_connection_callback(self, callback: Callable[["nobo", bool], None]) -> None: + """Deregister a previously registered connection-state callback.""" + if callback in self._connection_callbacks: + self._connection_callbacks.remove(callback) + + def _set_connected(self, value: bool) -> None: + """Update connection state and fire callbacks on transitions.""" + if self._connected == value: + return + self._connected = value + for cb in list(self._connection_callbacks): + try: + cb(self, value) + except Exception: + _LOGGER.exception("Connection-state callback raised") + async def connect(self) -> None: """Connect to Ecohub, either by scanning or directly.""" connected = False @@ -501,7 +540,7 @@ async def stop(self) -> None: if self._socket_receive_task: self._socket_receive_task.cancel() with suppress(asyncio.CancelledError): - await self._keep_alive_task + await self._socket_receive_task await self.close() _LOGGER.info('disconnected from Nobø Ecohub') @@ -513,6 +552,7 @@ async def close(self) -> None: await self._writer.wait_closed() self._writer = None _LOGGER.info('connection closed') + self._set_connected(False) def connect_hub(self, ip: str, serial: str) -> bool: warnings.warn( @@ -580,6 +620,11 @@ async def async_connect_hub(self, ip: str, serial: str) -> bool: await asyncio.wait_for(self._get_initial_data(), timeout=5) except asyncio.TimeoutError as e: raise PynoboConnectionError(f'Timed out waiting for initial data from {ip}') from e + # Fire connection callback before data callback so consumers + # that gate on `connected` see the transition before the data + # arrives and don't have to handle a "data while disconnected" + # window during reconnect. + self._set_connected(True) for callback in self._callbacks: callback(self) return True @@ -605,44 +650,46 @@ async def async_connect_hub(self, ip: str, serial: str) -> bool: raise PynoboHandshakeError(f'connection to hub rejected: {response}') async def reconnect_hub(self) -> None: - """Attempt to reconnect to the hub.""" + """Keep trying to reconnect to the hub, with exponential backoff. + Retries indefinitely on transport-level failures (network down, hub + unreachable). Handshake-level rejection (PynoboHandshakeError) is not + caught here — it propagates out so an unrecoverable error isn't + silently retried forever. + """ _LOGGER.info('reconnecting to hub') # Pause keep alive during reconnect self._keep_alive = False - # TODO: set timeout? - if self.discover: - # Reconnect using complete serial, but allow ip to change unless originally provided - discovered_hubs = await self.async_discover_hubs(ip=self.ip, serial=self.hub_serial, rediscover=True) - while discovered_hubs: - (discover_ip, discover_serial) = discovered_hubs.pop() - try: - connected = await self.async_connect_hub(discover_ip, discover_serial) - if connected: - break - except OSError as e: - # We know we should be able to connect, because we just discovered the IP address. However, if - # the connection was lost due to network problems on our host, we must wait until we have a local - # IP address. E.g. discovery may find Nobø Ecohub before DHCP address is assigned. - if e.errno in RECONNECT_ERRORS: - _LOGGER.warning("Failed to connect to ip %s: %s", discover_ip, e) - discovered_hubs.add( (discover_ip, discover_serial) ) - await asyncio.sleep(1) - else: - raise PynoboConnectionError(f'Failed to reconnect to Nobø Ecohub at {discover_ip}: {e}') from e - else: - connected = False - while not connected: - _LOGGER.debug('Discovery disabled - waiting 10 seconds before trying to reconnect.') - await asyncio.sleep(10) - with suppress(asyncio.TimeoutError): - try: - connected = await self.async_connect_hub(self.ip, self.serial) - except OSError as e: - if e.errno in RECONNECT_ERRORS: - _LOGGER.debug('Ignoring %s', e) - else: - raise PynoboConnectionError(f'Failed to reconnect to Nobø Ecohub at {self.ip}: {e}') from e + delay = RECONNECT_INITIAL_DELAY + while True: + _LOGGER.debug('waiting %ds before next reconnect attempt', delay) + await asyncio.sleep(delay) + try: + if self.discover: + # Reconnect using complete serial, but allow ip to change unless originally provided + discovered_hubs = await self.async_discover_hubs( + ip=self.ip, serial=self.hub_serial, rediscover=True, + ) + connected = False + while discovered_hubs and not connected: + (discover_ip, discover_serial) = discovered_hubs.pop() + try: + connected = await self.async_connect_hub(discover_ip, discover_serial) + except PynoboConnectionError as inner: + _LOGGER.warning("Failed to connect to %s: %s", discover_ip, inner) + else: + connected = await self.async_connect_hub(self.ip, self.serial) + except PynoboHandshakeError: + raise # unrecoverable — propagate so socket_receive's outer arm can stop() us + except PynoboConnectionError as e: + _LOGGER.info( + "reconnect attempt failed: %s; retrying in %ds", + e, min(delay * 2, RECONNECT_MAX_DELAY), + ) + connected = False + if connected: + break + delay = min(delay * 2, RECONNECT_MAX_DELAY) self._keep_alive = True _LOGGER.info('reconnected to Nobø Hub') @@ -651,7 +698,7 @@ async def reconnect_hub(self) -> None: def discover_hubs( serial: str = "", ip: str | None = None, - autodiscover_wait: float = 3.0, + autodiscover_wait: float = 5.0, loop: asyncio.AbstractEventLoop | None = None, ) -> set[tuple[str, str]]: warnings.warn( @@ -671,7 +718,7 @@ def discover_hubs( async def async_discover_hubs( serial: str = "", ip: str | None = None, - autodiscover_wait: float = 3.0, + autodiscover_wait: float = 5.0, loop: asyncio.AbstractEventLoop | None = None, rediscover: bool = False, ) -> set[tuple[str, str]]: @@ -694,7 +741,7 @@ async def async_discover_hubs( :param serial: The last 3 digits of the Ecohub serial number or the complete 12 digit serial number :param ip: ip address to search for Ecohub at (default None) - :param autodiscover_wait: how long to wait for an autodiscover package from the hub (default 3.0) + :param autodiscover_wait: how long to wait for an autodiscover package from the hub (default 5.0) :param loop: deprecated :param rediscover: if true, run until the hub is discovered @@ -734,17 +781,29 @@ def _reuse_port() -> bool: pass return False - async def keep_alive(self, interval: int = 14) -> None: + async def keep_alive(self, interval: float = 14) -> None: """ Send a periodic handshake. Needs to be sent every < 30 sec, preferably every 14 seconds. :param interval: seconds between each handshake. Default 14. """ self._keep_alive = True + self._last_recv_at = time.monotonic() while True: await asyncio.sleep(interval) - if self._keep_alive: - await self.async_send_command([nobo.API.HANDSHAKE]) + if not self._keep_alive: + continue + # If nothing has come back from the hub within 2× the interval, the link is + # dead (e.g. silent network drop — WiFi off, hub unplugged). Close the writer + # to force readuntil in socket_receive() to EOF, routing into reconnect_hub(). + # NOTE: this liveness check relies on the hub echoing HANDSHAKE at the app layer. + # If HANDSHAKE is ever replaced with the spec's KEEPALIVE, investigate how the + # message is acknowledged by the hub. + if time.monotonic() - self._last_recv_at > 2 * interval: + _LOGGER.info('no response from hub in %ss, forcing reconnect', 2 * interval) + await self.close() + continue + await self.async_send_command([nobo.API.HANDSHAKE]) def _create_task(self, target: Any) -> None: try: @@ -806,6 +865,7 @@ async def get_response(self) -> list[str]: _LOGGER.info('lost connection to hub (%s)', e) await self.close() raise PynoboConnectionError(f'Lost connection to Nobø Ecohub: {e}') from e + self._last_recv_at = time.monotonic() response = message.decode('utf-8').split(' ') _LOGGER.debug('received: %s', response) return response @@ -824,18 +884,31 @@ async def socket_receive(self) -> None: self.response_handler(response) for callback in self._callbacks: callback(self) - except (asyncio.IncompleteReadError) as e: + except asyncio.IncompleteReadError: + _LOGGER.info('connection to hub closed by peer; reconnecting') + self._set_connected(False) + await self.reconnect_hub() + except PynoboConnectionError as e: + # get_response() wraps ConnectionError from readuntil as PynoboConnectionError, + # so route it through the reconnect path here. _LOGGER.info('Reconnecting due to %s', e) + self._set_connected(False) await self.reconnect_hub() except (OSError) as e: if e.errno in RECONNECT_ERRORS: _LOGGER.info('Reconnecting due to %s', e) + self._set_connected(False) await self.reconnect_hub() else: # Caught by the outer `except Exception` below, so don't need to wrap. raise e except asyncio.CancelledError: _LOGGER.debug('socket_receive stopped') + except PynoboHandshakeError as e: + # Unrecoverable — the hub rejected us (bad serial, version mismatch, etc.). + # Log cleanly instead of as an "Unhandled exception" traceback. + _LOGGER.error('hub rejected handshake, giving up: %s', e) + await self.stop() except Exception as e: # Ops, now we have real problems _LOGGER.error('Unhandled exception %s', e, exc_info=1) diff --git a/test_pynobo.py b/test_pynobo.py index 1e899a2..997f6f1 100644 --- a/test_pynobo.py +++ b/test_pynobo.py @@ -1,5 +1,9 @@ +import asyncio +import errno import pathlib import unittest +from contextlib import suppress +from unittest.mock import AsyncMock, MagicMock, patch from pynobo import ( PynoboConnectionError, @@ -84,5 +88,338 @@ def test_py_typed_file_is_present_in_source(self): self.assertTrue(marker.is_file(), f"{marker} is missing") +class TestConnectionStateAPI(unittest.TestCase): + + def _make_hub(self): + return nobo('123', discover=False, synchronous=False) + + def test_initial_state_is_disconnected(self): + hub = self._make_hub() + events = [] + hub.register_connection_callback(lambda h, s: events.append(s)) + self.assertFalse(hub.connected) + self.assertEqual(events, []) + + def test_transition_fires_callback_once(self): + hub = self._make_hub() + events = [] + hub.register_connection_callback(lambda h, s: events.append((h is hub, s))) + hub._set_connected(True) + self.assertTrue(hub.connected) + self.assertEqual(events, [(True, True)]) + + def test_transitions_are_idempotent(self): + hub = self._make_hub() + events = [] + hub.register_connection_callback(lambda h, s: events.append(s)) + hub._set_connected(True) + hub._set_connected(True) + hub._set_connected(False) + hub._set_connected(False) + self.assertEqual(events, [True, False]) + + def test_deregister_stops_callback(self): + hub = self._make_hub() + events = [] + cb = lambda h, s: events.append(s) + hub.register_connection_callback(cb) + hub._set_connected(True) + hub.deregister_connection_callback(cb) + hub._set_connected(False) + self.assertEqual(events, [True]) + + def test_callback_exception_does_not_break_dispatch(self): + hub = self._make_hub() + events = [] + def boom(h, s): + raise RuntimeError("boom") + hub.register_connection_callback(boom) + hub.register_connection_callback(lambda h, s: events.append(s)) + hub._set_connected(True) + self.assertEqual(events, [True]) + self.assertTrue(hub.connected) + + +class TestSocketReceiveReconnect(unittest.IsolatedAsyncioTestCase): + + def _make_hub(self): + return nobo('123', discover=False, synchronous=False) + + def _install_fake_transport(self, hub, errors_to_raise): + """Wire fake reader/writer + spies for reconnect_hub / stop. + + The reader raises each exception in `errors_to_raise` on successive + readuntil() calls, then hangs so the task stays alive until cancelled. + Returns (reconnect_calls, stop_calls, reconnect_done_event). + """ + errors = iter(errors_to_raise) + hang = asyncio.Event() # never set + + reader = MagicMock(spec=asyncio.StreamReader) + + async def readuntil(_sep): + try: + raise next(errors) + except StopIteration: + await hang.wait() + return b'\r' # unreachable + + reader.readuntil = readuntil + + writer = MagicMock(spec=asyncio.StreamWriter) + writer.close = MagicMock() + writer.wait_closed = AsyncMock() + + hub._reader = reader + hub._writer = writer + + reconnect_calls = [] + reconnect_done = asyncio.Event() + + async def fake_reconnect(): + reconnect_calls.append(True) + hub._set_connected(True) + reconnect_done.set() + + hub.reconnect_hub = fake_reconnect + + stop_calls = [] + + async def fake_stop(): + stop_calls.append(True) + + hub.stop = fake_stop + + return reconnect_calls, stop_calls, reconnect_done + + async def _run_socket_receive_until_reconnect(self, hub, reconnect_done): + task = asyncio.create_task(hub.socket_receive()) + try: + await asyncio.wait_for(reconnect_done.wait(), timeout=1) + finally: + task.cancel() + with suppress(asyncio.CancelledError): + await task + + async def test_reconnect_on_econnreset(self): + """ConnectionResetError from readuntil must route through reconnect_hub, not stop().""" + hub = self._make_hub() + err = ConnectionResetError(errno.ECONNRESET, "Connection reset by peer") + reconnect_calls, stop_calls, reconnect_done = self._install_fake_transport(hub, [err]) + + await self._run_socket_receive_until_reconnect(hub, reconnect_done) + + self.assertEqual(len(reconnect_calls), 1) + self.assertEqual(stop_calls, []) + + async def test_reconnect_on_oserror_with_reconnect_errno(self): + """Plain OSError with an errno in RECONNECT_ERRORS must also trigger reconnect.""" + hub = self._make_hub() + err = OSError(errno.ETIMEDOUT, "Timed out") + reconnect_calls, stop_calls, reconnect_done = self._install_fake_transport(hub, [err]) + + await self._run_socket_receive_until_reconnect(hub, reconnect_done) + + self.assertEqual(len(reconnect_calls), 1) + self.assertEqual(stop_calls, []) + + async def test_connection_callback_transitions_on_disconnect_reconnect(self): + """Callback must see True → False → True across a drop and successful reconnect.""" + hub = self._make_hub() + events = [] + hub.register_connection_callback(lambda _h, state: events.append(state)) + hub._set_connected(True) # initial connected state + + err = ConnectionResetError(errno.ECONNRESET, "Connection reset by peer") + _reconnect_calls, _stop_calls, reconnect_done = self._install_fake_transport(hub, [err]) + + await self._run_socket_receive_until_reconnect(hub, reconnect_done) + + self.assertEqual(events, [True, False, True]) + + async def test_socket_receive_stops_cleanly_on_handshake_error(self): + """Terminal handshake rejection during reconnect routes to outer arm → stop(). + + Complements test_reconnect_hub_propagates_handshake_error: that one verifies + reconnect_hub lets the exception escape; this one verifies socket_receive's + outer arm actually catches it, logs cleanly, and exits via stop() instead of + the misleading "Unhandled exception" catch-all. + """ + hub = self._make_hub() + err = ConnectionResetError(errno.ECONNRESET, "Connection reset by peer") + self._install_fake_transport(hub, [err]) + # Override fake reconnect: this time reconnect itself fails terminally. + hub.reconnect_hub = AsyncMock(side_effect=PynoboHandshakeError("rejected")) + + stop_calls = [] + + async def fake_stop(): + stop_calls.append(True) + + hub.stop = fake_stop + + # socket_receive should exit on its own — no external cancel needed. + await asyncio.wait_for(hub.socket_receive(), timeout=1) + + self.assertEqual(stop_calls, [True]) + + async def test_reconnect_hub_propagates_handshake_error(self): + """Handshake rejection must escape the retry loop so it surfaces to the outer handler. + + PynoboHandshakeError means the hub is refusing us (bad serial, version mismatch) — + no amount of retrying will fix that. It should propagate out for socket_receive's + outer arm to log cleanly and stop. + """ + hub = self._make_hub() + call_count = {'n': 0} + + async def fake_connect(_ip, _serial): + call_count['n'] += 1 + raise PynoboHandshakeError("hub rejected handshake") + + hub.async_connect_hub = fake_connect + + with patch('pynobo.asyncio.sleep', new_callable=AsyncMock): + with self.assertRaises(PynoboHandshakeError): + await hub.reconnect_hub() + + self.assertEqual(call_count['n'], 1) + self.assertFalse(hub.connected) + + async def test_reconnect_hub_retries_on_transient_failure(self): + """reconnect_hub must keep retrying when async_connect_hub raises PynoboConnectionError. + + The pre-fix code caught only OSError, but async_connect_hub wraps OSError from + open_connection into PynoboConnectionError — so the first failed attempt escaped, + bounced out of socket_receive as Unhandled exception, and called stop(). + """ + hub = self._make_hub() + events = [] + hub.register_connection_callback(lambda _h, state: events.append(state)) + + call_count = {'n': 0} + + async def fake_connect(_ip, _serial): + call_count['n'] += 1 + if call_count['n'] < 3: + raise PynoboConnectionError("transient network failure") + # Mirror real async_connect_hub: fire the connection callback on + # success before returning, so reconnect_hub doesn't have to. + hub._set_connected(True) + return True + + hub.async_connect_hub = fake_connect + + # Patch sleep so the test doesn't wait through the real backoff (10 + 20 s). + with patch('pynobo.asyncio.sleep', new_callable=AsyncMock): + await hub.reconnect_hub() + + self.assertEqual(call_count['n'], 3) + # Only one transition fires: the final True on success. No intermediate False — + # reconnect_hub doesn't touch state until it has a good connection. + self.assertEqual(events, [True]) + self.assertTrue(hub.connected) + + async def test_liveness_deadline_triggers_reconnect_on_silent_drop(self): + """If no frame arrives within 2× the keep-alive interval, force a reconnect. + + Models the silent-network-drop case: readuntil hangs (packets go nowhere, + nothing comes back). keep_alive's liveness check should close the writer, + which unblocks readuntil with EOF → IncompleteReadError → reconnect_hub. + """ + hub = self._make_hub() + events = [] + hub.register_connection_callback(lambda _h, state: events.append(state)) + hub._set_connected(True) + + reader = MagicMock(spec=asyncio.StreamReader) + hang = asyncio.Event() # released by close() to surface EOF + + async def readuntil(_sep): + await hang.wait() + raise asyncio.IncompleteReadError(partial=b'', expected=1) + + reader.readuntil = readuntil + + writer = MagicMock(spec=asyncio.StreamWriter) + + def close_writer(): + # After close, any subsequent readuntil (post-reconnect) must hang + # forever so the test loop doesn't spin. + async def hang_forever(_sep): + await asyncio.Event().wait() + reader.readuntil = hang_forever + hang.set() # wakes the already-pending readuntil, which raises EOF + + writer.close = MagicMock(side_effect=close_writer) + writer.wait_closed = AsyncMock() + hub._reader = reader + hub._writer = writer + + reconnect_done = asyncio.Event() + reconnect_calls = [] + + async def fake_reconnect(): + reconnect_calls.append(True) + hub._set_connected(True) + reconnect_done.set() + + hub.reconnect_hub = fake_reconnect + hub.stop = AsyncMock() + + recv_task = asyncio.create_task(hub.socket_receive()) + keep_task = asyncio.create_task(hub.keep_alive(interval=0.1)) + try: + await asyncio.wait_for(reconnect_done.wait(), timeout=2) + finally: + recv_task.cancel() + keep_task.cancel() + with suppress(asyncio.CancelledError): + await recv_task + with suppress(asyncio.CancelledError): + await keep_task + + self.assertEqual(len(reconnect_calls), 1) + self.assertEqual(events, [True, False, True]) + + async def test_connect_fires_connection_callback_before_data_callback(self): + """async_connect_hub fires the connection callback before the data callback. + + Consumers (notably HA's nobo_hub integration) gate availability on the + connection callback. Reordering them would re-introduce the race where + a data callback fires while the consumer still thinks the hub is + disconnected. + """ + hub = nobo('123456789012', ip='192.168.1.1', discover=False, synchronous=False) + order = [] + hub.register_callback(lambda _h: order.append('data')) + hub.register_connection_callback(lambda _h, _s: order.append('connection')) + + reader = MagicMock(spec=asyncio.StreamReader) + writer = MagicMock(spec=asyncio.StreamWriter) + writer.close = MagicMock() + writer.wait_closed = AsyncMock() + + # Successful handshake responses, then nothing (handshake-only path). + responses = iter([ + ['HELLO', nobo.API.VERSION, '123456789012', '20260101000000'], + ['HANDSHAKE'], + ]) + + async def fake_get_response(): + return next(responses) + + hub.get_response = fake_get_response + hub.async_send_command = AsyncMock() + hub._get_initial_data = AsyncMock() + + with patch('pynobo.asyncio.open_connection', + new=AsyncMock(return_value=(reader, writer))): + result = await hub.async_connect_hub('192.168.1.1', '123456789012') + + self.assertTrue(result) + self.assertEqual(order, ['connection', 'data']) + + if __name__ == '__main__': unittest.main()