Skip to content
33 changes: 31 additions & 2 deletions homeassistant/components/sensor/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
import asyncio
import logging
from datetime import timedelta

import voluptuous as vol

Expand All @@ -16,10 +17,13 @@
from homeassistant.helpers.entity import Entity
import homeassistant.components.mqtt as mqtt
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.util import dt as dt_util

_LOGGER = logging.getLogger(__name__)

CONF_FORCE_UPDATE = 'force_update'
CONF_EXPIRE_AFTER = 'expire_after'

DEFAULT_NAME = 'MQTT Sensor'
DEFAULT_FORCE_UPDATE = False
Expand All @@ -28,6 +32,7 @@
PLATFORM_SCHEMA = mqtt.MQTT_RO_PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
vol.Optional(CONF_EXPIRE_AFTER, default=0): cv.positive_int,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the default and just check if self._expire_after is not None: below?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can but i'd still have to check 0 and None because with 0 it would cause strange behaviour otherwise.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. Sounds good to me.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which one? default or check 0 and None?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, you can keep it as is. keep the default zero.

vol.Optional(CONF_FORCE_UPDATE, default=DEFAULT_FORCE_UPDATE): cv.boolean,
})

Expand All @@ -48,6 +53,7 @@ def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
config.get(CONF_QOS),
config.get(CONF_UNIT_OF_MEASUREMENT),
config.get(CONF_FORCE_UPDATE),
config.get(CONF_EXPIRE_AFTER),
value_template,
)])

Expand All @@ -56,7 +62,7 @@ class MqttSensor(Entity):
"""Representation of a sensor that can be updated using MQTT."""

def __init__(self, name, state_topic, qos, unit_of_measurement,
force_update, value_template):
force_update, expire_after, value_template):
"""Initialize the sensor."""
self._state = STATE_UNKNOWN
self._name = name
Expand All @@ -65,6 +71,8 @@ def __init__(self, name, state_topic, qos, unit_of_measurement,
self._unit_of_measurement = unit_of_measurement
self._force_update = force_update
self._template = value_template
self._expire_after = expire_after
self._value_expiration_at = 0

def async_added_to_hass(self):
"""Subscribe mqtt events.
Expand All @@ -74,15 +82,36 @@ def async_added_to_hass(self):
@callback
def message_received(topic, payload, qos):
"""A new MQTT message has been received."""
# auto-expire enabled?
if self._expire_after > 0:
# Reset expiration time and set trigger
self._value_expiration_at = (
dt_util.utcnow() +
timedelta(seconds=self._expire_after)
)
async_track_point_in_utc_time(self.hass,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async_track_point_in_utc_time returns a function that will unsubscribe the just subscribed listener. So instead of storing a timestamp when to expire, unsubscribe the old listener if it exists and subscribe a new one.

Then in the callback, set the unsubscribe function to None

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be thread-safe? E.g. no race-condition between unsubscribing a (probably in exactly in this moment executed callback) and a new message?

Flow with new code:

  • Callback is executed in the moment the message comes in
  • Unsubscribe is executed -> too late to have effect
  • new value is set
  • already running Callback resets the value

The more I think about it, the more I have the impression that I can also run in that with my current code:
Flow with my old code:

  • Callback is executed in the moment the message comes in
  • Callback checks time, detects that the value is to expire
  • new expire time and value is set
  • already running Callback resets the value

Any thoughts about the thread safety here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To answer this question, the mqtt component is written using Python's asyncio library, which means that only one of these callbacks can be running at a time. If you do some searching for asyncio you can find some good literature.

self.check_value_is_expired,
self._value_expiration_at +
timedelta(seconds=1))

if self._template is not None:
payload = self._template.async_render_with_possible_json_value(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you change this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line was 80 chars long (not my edit but wanted to have it fixed).

template = self._template
payload = template.async_render_with_possible_json_value(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line was >79 chars long. I wonder how it passed the checks

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looked to me that it was exactly 79 characters long. It should be fine. If it was in there it is OK for length.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, don't know why I got an error there. Maybe I accidently idented one more. Reverted.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I figured it probably got indented while working on it, then reverted.

payload, self._state)
self._state = payload
self.hass.async_add_job(self.async_update_ha_state())

return mqtt.async_subscribe(
self.hass, self._state_topic, message_received, self._qos)

@callback
def check_value_is_expired(self, *_):
"""Check if value is expired."""
if (self._expire_after > 0 and
dt_util.utcnow() > self._value_expiration_at):
self._state = STATE_UNKNOWN
self.hass.async_add_job(self.async_update_ha_state())

@property
def should_poll(self):
"""No polling needed."""
Expand Down
47 changes: 46 additions & 1 deletion tests/components/sensor/test_mqtt.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
"""The tests for the MQTT sensor platform."""
import unittest

from datetime import timedelta, datetime

import homeassistant.core as ha
from homeassistant.setup import setup_component
import homeassistant.components.sensor as sensor
from homeassistant.const import EVENT_STATE_CHANGED
from tests.common import mock_mqtt_component, fire_mqtt_message
import homeassistant.util.dt as dt_util

from tests.common import mock_mqtt_component, fire_mqtt_message
from tests.common import get_test_home_assistant, mock_component
from tests.common import fire_time_changed


class TestSensorMQTT(unittest.TestCase):
Expand Down Expand Up @@ -42,6 +46,47 @@ def test_setting_sensor_value_via_mqtt_message(self):
self.assertEqual('fav unit',
state.attributes.get('unit_of_measurement'))

def test_setting_sensor_value_expires(self):
"""Test the expiration of the value."""
mock_component(self.hass, 'mqtt')
assert setup_component(self.hass, sensor.DOMAIN, {
sensor.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'test-topic',
'unit_of_measurement': 'fav unit',
'expire_after': '2',
'force_update': True
}
})

time = datetime(2017, 1, 1, 1, tzinfo=dt_util.UTC)
fire_time_changed(self.hass, time)

state = self.hass.states.get('sensor.test')
self.assertEqual('unknown', state.state)

fire_mqtt_message(self.hass, 'test-topic', '100')
self.hass.block_till_done()

state = self.hass.states.get('sensor.test')
self.assertEqual('100', state.state)

time = time + timedelta(seconds=1)
fire_time_changed(self.hass, time)

""" Not yet expired """

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments in Python start with a #

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please prefix comments with # . Only doc strings (first comment in a function) should contain triple quotes.

state = self.hass.states.get('sensor.test')
self.assertEqual('100', state.state)

time = time + timedelta(seconds=2)
fire_time_changed(self.hass, time)

""" Expired """
state = self.hass.states.get('sensor.test')
# FIXME: this will fail unless the fixmes above are fixed

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Soo, fixed now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, forgot that one ;) Yes, the test is fixed now.

# self.assertEqual('unknown', state.state)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stale comment.


def test_setting_sensor_value_via_mqtt_json_message(self):
"""Test the setting of the value via MQTT with JSON playload."""
mock_component(self.hass, 'mqtt')
Expand Down