Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 61 additions & 5 deletions pytradfri/api/aiocoap_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import json
import logging
import socket
import time

from aiocoap import Message, Context
from aiocoap.error import RequestTimedOut, Error, ConstructionRenderableError
from aiocoap.numbers.codes import Code
from aiocoap.transports import tinydtls

from ..const import OBSERVATION_SLEEP_TIME, OBSERVATION_TIMEOUT
from ..error import ClientError, ServerError, RequestTimeout
from ..gateway import Gateway

Expand All @@ -30,12 +32,16 @@ def _get_psk(self, host, port):


class APIFactory:
last_changed = time.time()

def __init__(self, host, psk_id='pytradfri', psk=None, loop=None):
self._psk = psk
self._host = host
self._psk_id = psk_id
self._loop = loop
self._observations_err_callbacks = []
self._is_checking = False
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you expecting a race of some sort? Maybe we should consider using asyncio.Lock instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not specifically a race condition. But because the asyncio.sleep function is called within _remove_timedout_observations, it can happen that a observation is checked multiple times. This shouldn't cause any problems, but is just unnecessary. Using an asyncio.Lock will have the same behaviour, but just seqential instead of parallel.

This check is only done to make sure we check and reset the observations only once at a time, even when controlling multiple lights at once.

self._is_resetting = False
self._observations = []
self._protocol = None

if self._loop is None:
Expand Down Expand Up @@ -78,9 +84,10 @@ async def _reset_protocol(self, exc=None):
await protocol.shutdown()
self._protocol = None
# Let any observers know the protocol has been shutdown.
for ob_error in self._observations_err_callbacks:
ob_error(exc)
self._observations_err_callbacks.clear()
while self._observations:
ob = self._observations.pop()
ob.cancel()
del ob

async def shutdown(self, exc=None):
"""Shutdown the API events.
Expand Down Expand Up @@ -140,17 +147,23 @@ async def _execute(self, api_command):
api_method = Code.FETCH
elif method == 'patch':
api_method = Code.PATCH
elif method is None:
return

msg = Message(code=api_method, uri=url, **kwargs)

_, res = await self._get_response(msg)

api_command.result = _process_output(res, parse_json)
self._loop.create_task(self._remove_timedout_observations())

return api_command.result

async def request(self, api_commands):
"""Make a request."""
if not api_commands:
return None

if not isinstance(api_commands, list):
result = await self._execute(api_commands)
return result
Expand All @@ -175,14 +188,49 @@ async def _observe(self, api_command):

def success_callback(res):
api_command.result = _process_output(res)
APIFactory.update_last_changed()

def error_callback(ex):
err_callback(ex)

ob = pr.observation
ob.register_callback(success_callback)
ob.register_errback(error_callback)
self._observations_err_callbacks.append(ob.error)
self._observations.append(ob)

async def _remove_timedout_observations(self):
"""
Removes dead observations from the API. An observation is considered
dead when a timeout (defined in const) is reached.
"""
if self._is_checking:
_LOGGER.debug("Already checking for observations...")
return

self._is_checking = True
current_time = time.time()
await asyncio.sleep(OBSERVATION_SLEEP_TIME, loop=self._loop)

if (current_time - APIFactory.get_last_changed()) > \
(OBSERVATION_TIMEOUT + OBSERVATION_SLEEP_TIME):
_LOGGER.warning('Resetting Tradfri observations...')

if self._is_resetting:
return

self._is_resetting = True

while self._observations:
ob = self._observations.pop()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to let aiocoap know that we're ending the observation? If we discard an active observation accidentally does that mean we'll potentially leak a connection?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connections should end as defined in aiocoap.protocol._BaseClientObservation. I've reworked this a bit in 17638af so it should also cancel out.

for c in ob.errbacks:
c(None)
ob.cancel()
del ob

APIFactory.update_last_changed()
self._is_resetting = False

self._is_checking = False

async def generate_psk(self, security_key):
"""Generate and set a psk from the security key."""
Expand All @@ -203,6 +251,14 @@ async def generate_psk(self, security_key):

return self._psk

@classmethod
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that an APIFactory will be created for each gateway, rather than each observation, should this be instanced instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this function is called from the callback defined in _observe(), when trying to use an instance method, the callback doesn't have any relations to self or an instance of APIFactory anymore. That's why I put this on a 'global' timer.

If you can point me out on how to use an instance method in the aforementioned callback, I would gladly implement that change.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

success_callback is created in scope of self?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

success_callback only has it's own scope within the method.

def update_last_changed(cls):
cls.last_changed = time.time()

@classmethod
def get_last_changed(cls):
return cls.last_changed


def _process_output(res, parse_json=True):
"""Process output."""
Expand Down
3 changes: 3 additions & 0 deletions pytradfri/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,6 @@
SUPPORT_HEX_COLOR = 4
SUPPORT_RGB_COLOR = 8
SUPPORT_XY_COLOR = 16

OBSERVATION_SLEEP_TIME = 5
OBSERVATION_TIMEOUT = 10
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not sold on these values, if I don't touch any of my lights for a few hours they won't send any events IIRC. Meaning that the _check_observations function taking a guess at best, and it'll consider all my observations to be 'dead' when I make a request.

I'm cautious as many users (such as myself) have good stability on observations, I can have connections open for weeks without issue. This change potentially closes observations unnecessarily.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That behaviour should never happen though. In my testing it still hasn't. Even after a few hours or close to a day, the observations don't just reset. They just work. What happens is the following flow:

  1. You send a command to the Tradfri gateway
  2. The check method for dead observations is triggered
    1. This method sets checking = True
    2. This method waits for 5 seconds initially
    3. go to 3 below
    4. The last updated global timer is checked against the values defined in const
      1. If the time is above the defined times, the observations will be reset
      2. If the time is below the defined times, nothing happens
    5. Set checking = False
  3. The Tradfri hub sends a response in the observation callback
    1. The state of the object is changed from the callback
    2. The last updated global time is updated
    3. go to 2.4 above

In my experience most of this happens in a fraction of a second. Almost all of my requests were sub second response times. In some edge cases it took a bit longer, but not that long. We can always up this value though. But I encourage you to test it out for yourself, perhaps with debug logging like:

_LOGGER.debug('Tradfri response_time %f', time.time() - APIFactory.get_last_changed())

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.1, what's causing the observation callback to be triggered? If there's no activity on any of the devices, what's causing the gateway to send a message? I think this is my largest point of confusion in this change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the device responds, the observation callback will be triggered. It's the same as before.

But if no response from the Tradfri gateway will be received within the timeout window, the observations will be reset. That's the only functional change.

42 changes: 40 additions & 2 deletions pytradfri/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,21 @@ def _value_validate(self, value, rnge, identifier="Given"):
raise ValueError('%s value must be between %d and %d.'
% (identifier, rnge[0], rnge[1]))

def _filter_duplicates(self, values=None):
"""
Removes duplicate state changes from the input object.
"""
if values is None:
return False

commands = {}
for k, v in values.items():
if k == ATTR_DEVICE_STATE or k not in self.raw[0] or \
(k in self.raw[0] and self.raw[0][k] != v):
commands[k] = v

return len(commands) > 0

def set_values(self, values, *, index=0):
"""
Set values on light control.
Expand All @@ -307,7 +322,11 @@ def set_values(self, values, *, index=0):
assert len(self.raw) == 1, \
'Only devices with 1 light supported'

return Command('put', self._device.path, {
method = None
if self._filter_duplicates(values):
method = 'put'

return Command(method, self._device.path, {
ATTR_LIGHT_CONTROL: [
values
]
Expand Down Expand Up @@ -408,6 +427,21 @@ def set_state(self, state, *, index=0):
ATTR_DEVICE_STATE: int(state)
}, index=index)

def _filter_duplicates(self, values=None):
"""
Removes duplicate state changes from the input object.
"""
if values is None:
return False

commands = {}
for k, v in values.items():
if k not in self.raw[0] or \
(k in self.raw[0] and self.raw[0][k] != v):
commands[k] = v

return len(commands) > 0

def set_values(self, values, *, index=0):
"""
Set values on socket control.
Expand All @@ -416,7 +450,11 @@ def set_values(self, values, *, index=0):
assert len(self.raw) == 1, \
'Only devices with 1 socket supported'

return Command('put', self._device.path, {
method = None
if self._filter_duplicates(values):
method = 'put'

return Command(method, self._device.path, {
ATTR_SWITCH_PLUG: [
values
]
Expand Down