Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 25 additions & 48 deletions homeassistant/components/sensor/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
vol.Optional(CONF_JSON_ATTRS, default=[]): cv.ensure_list_csv,
vol.Optional(CONF_EXPIRE_AFTER): cv.positive_int,
vol.Optional(CONF_FORCE_UPDATE, default=DEFAULT_FORCE_UPDATE): cv.boolean,
# Integrations shouldn't never expose unique_id through configuration
# this here is an exception because MQTT is a msg transport, not a protocol
# Integrations should never expose unique_id through configuration.
# This is an exception because MQTT is a message transport, not a protocol.
vol.Optional(CONF_UNIQUE_ID): cv.string,
vol.Optional(CONF_DEVICE): mqtt.MQTT_ENTITY_DEVICE_INFO_SCHEMA,
}).extend(mqtt.MQTT_AVAILABILITY_SCHEMA.schema)
Expand Down Expand Up @@ -86,32 +86,20 @@ class MqttSensor(MqttAvailability, MqttDiscoveryUpdate, MqttEntityDeviceInfo,

def __init__(self, config, discovery_hash):
"""Initialize the sensor."""
self._config = config
self._unique_id = config.get(CONF_UNIQUE_ID)
self._state = STATE_UNKNOWN
self._sub_state = None
self._expiration_trigger = None
self._attributes = None

self._name = None
self._state_topic = None
self._qos = None
self._unit_of_measurement = None
self._force_update = None
self._template = None
self._expire_after = None
self._icon = None
self._device_class = None
self._json_attributes = None
self._unique_id = None

# Load config
self._setup_from_config(config)

availability_topic = config.get(CONF_AVAILABILITY_TOPIC)
payload_available = config.get(CONF_PAYLOAD_AVAILABLE)
payload_not_available = config.get(CONF_PAYLOAD_NOT_AVAILABLE)
qos = config.get(CONF_QOS)
device_config = config.get(CONF_DEVICE)

MqttAvailability.__init__(self, availability_topic, self._qos,
MqttAvailability.__init__(self, availability_topic, qos,
payload_available, payload_not_available)
MqttDiscoveryUpdate.__init__(self, discovery_hash,
self.discovery_update)
Expand All @@ -125,72 +113,61 @@ async def async_added_to_hass(self):
async def discovery_update(self, discovery_payload):
"""Handle updated discovery message."""
config = PLATFORM_SCHEMA(discovery_payload)
self._setup_from_config(config)
self._config = config
await self.availability_discovery_update(config)
await self._subscribe_topics()
self.async_schedule_update_ha_state()

def _setup_from_config(self, config):
"""(Re)Setup the entity."""
self._name = config.get(CONF_NAME)
self._state_topic = config.get(CONF_STATE_TOPIC)
self._qos = config.get(CONF_QOS)
self._unit_of_measurement = config.get(CONF_UNIT_OF_MEASUREMENT)
self._force_update = config.get(CONF_FORCE_UPDATE)
self._expire_after = config.get(CONF_EXPIRE_AFTER)
self._icon = config.get(CONF_ICON)
self._device_class = config.get(CONF_DEVICE_CLASS)
self._template = config.get(CONF_VALUE_TEMPLATE)
self._json_attributes = set(config.get(CONF_JSON_ATTRS))
self._unique_id = config.get(CONF_UNIQUE_ID)

async def _subscribe_topics(self):
"""(Re)Subscribe to topics."""
if self._template is not None:
self._template.hass = self.hass
template = self._config.get(CONF_VALUE_TEMPLATE)
if template is not None:
template.hass = self.hass

@callback
def message_received(topic, payload, qos):
"""Handle new MQTT messages."""
# auto-expire enabled?
if self._expire_after is not None and self._expire_after > 0:
expire_after = self._config.get(CONF_EXPIRE_AFTER)
if expire_after is not None and expire_after > 0:
# Reset old trigger
if self._expiration_trigger:
self._expiration_trigger()
self._expiration_trigger = None

# Set new trigger
expiration_at = (
dt_util.utcnow() + timedelta(seconds=self._expire_after))
dt_util.utcnow() + timedelta(seconds=expire_after))

self._expiration_trigger = async_track_point_in_utc_time(
self.hass, self.value_is_expired, expiration_at)

if self._json_attributes:
json_attributes = set(self._config.get(CONF_JSON_ATTRS))
if json_attributes:
self._attributes = {}
try:
json_dict = json.loads(payload)
if isinstance(json_dict, dict):
attrs = {k: json_dict[k] for k in
self._json_attributes & json_dict.keys()}
json_attributes & json_dict.keys()}
self._attributes = attrs
else:
_LOGGER.warning("JSON result was not a dictionary")
except ValueError:
_LOGGER.warning("MQTT payload could not be parsed as JSON")
_LOGGER.debug("Erroneous JSON: %s", payload)

if self._template is not None:
payload = self._template.async_render_with_possible_json_value(
if template is not None:
payload = template.async_render_with_possible_json_value(
payload, self._state)
self._state = payload
self.async_schedule_update_ha_state()

self._sub_state = await subscription.async_subscribe_topics(
self.hass, self._sub_state,
{'state_topic': {'topic': self._state_topic,
{'state_topic': {'topic': self._config.get(CONF_STATE_TOPIC),
'msg_callback': message_received,
'qos': self._qos}})
'qos': self._config.get(CONF_QOS)}})

async def async_will_remove_from_hass(self):
"""Unsubscribe when removed."""
Expand All @@ -212,17 +189,17 @@ def should_poll(self):
@property
def name(self):
"""Return the name of the sensor."""
return self._name
return self._config.get(CONF_NAME)

@property
def unit_of_measurement(self):
"""Return the unit this state is expressed in."""
return self._unit_of_measurement
return self._config.get(CONF_UNIT_OF_MEASUREMENT)

@property
def force_update(self):
"""Force update."""
return self._force_update
return self._config.get(CONF_FORCE_UPDATE)

@property
def state(self):
Expand All @@ -242,9 +219,9 @@ def unique_id(self):
@property
def icon(self):
"""Return the icon."""
return self._icon
return self._config.get(CONF_ICON)

@property
def device_class(self) -> Optional[str]:
"""Return the device class of the sensor."""
return self._device_class
return self._config.get(CONF_DEVICE_CLASS)