Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion homeassistant/components/mqtt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
from homeassistant.util.logging import catch_log_exception

# Loading the config flow file will register the flow
from . import config_flow, discovery, server # noqa: F401 pylint: disable=unused-import
from . import config_flow # noqa: F401 pylint: disable=unused-import
from . import debug_info, discovery, server
from .const import (
ATTR_DISCOVERY_HASH,
ATTR_DISCOVERY_TOPIC,
Expand All @@ -56,6 +57,7 @@
DEFAULT_QOS,
PROTOCOL_311,
)
from .debug_info import log_messages
from .discovery import MQTT_DISCOVERY_UPDATED, clear_discovery_hash, set_discovery_hash
from .models import Message, MessageCallbackType, PublishPayloadType
from .subscription import async_subscribe_topics, async_unsubscribe_topics
Expand Down Expand Up @@ -513,6 +515,7 @@ async def async_setup(hass: HomeAssistantType, config: ConfigType) -> bool:

websocket_api.async_register_command(hass, websocket_subscribe)
websocket_api.async_register_command(hass, websocket_remove_device)
websocket_api.async_register_command(hass, websocket_mqtt_info)

if conf is None:
# If we have a config entry, setup is done by that config entry.
Expand Down Expand Up @@ -1058,6 +1061,7 @@ async def _attributes_subscribe_topics(self):
attr_tpl.hass = self.hass

@callback
@log_messages(self.hass, self.entity_id)
def attributes_message_received(msg: Message) -> None:
try:
payload = msg.payload
Expand Down Expand Up @@ -1122,6 +1126,7 @@ async def _availability_subscribe_topics(self):
"""(Re)Subscribe to topics."""

@callback
@log_messages(self.hass, self.entity_id)
def availability_message_received(msg: Message) -> None:
"""Handle a new received MQTT availability message."""
if msg.payload == self._avail_config[CONF_PAYLOAD_AVAILABLE]:
Expand Down Expand Up @@ -1207,6 +1212,7 @@ async def discovery_callback(payload):
_LOGGER.info(
"Got update for entity with hash: %s '%s'", discovery_hash, payload,
)
debug_info.update_entity_discovery_data(self.hass, payload, self.entity_id)
if not payload:
# Empty payload: Remove component
_LOGGER.info("Removing component: %s", self.entity_id)
Expand All @@ -1219,6 +1225,9 @@ async def discovery_callback(payload):
await self._discovery_update(payload)

if discovery_hash:
debug_info.add_entity_discovery_data(
self.hass, self._discovery_data, self.entity_id
)
# Set in case the entity has been removed and is re-added
set_discovery_hash(self.hass, discovery_hash)
self._remove_signal = async_dispatcher_connect(
Expand All @@ -1242,6 +1251,7 @@ async def async_will_remove_from_hass(self) -> None:
def _cleanup_on_remove(self) -> None:
"""Stop listening to signal and cleanup discovery data."""
if self._discovery_data and not self._removed_from_hass:
debug_info.remove_entity_data(self.hass, self.entity_id)
clear_discovery_hash(self.hass, self._discovery_data[ATTR_DISCOVERY_HASH])
self._removed_from_hass = True

Expand Down Expand Up @@ -1303,6 +1313,18 @@ def device_info(self):
return device_info_from_config(self._device_config)


@websocket_api.websocket_command(
{vol.Required("type"): "mqtt/device/debug_info", vol.Required("device_id"): str}
)
@websocket_api.async_response
async def websocket_mqtt_info(hass, connection, msg):
"""Get MQTT debug info for device."""
device_id = msg["device_id"]
mqtt_info = await debug_info.info_for_device(hass, device_id)

connection.send_result(msg["id"], mqtt_info)


@websocket_api.websocket_command(
{vol.Required("type"): "mqtt/device/remove", vol.Required("device_id"): str}
)
Expand Down
1 change: 1 addition & 0 deletions homeassistant/components/mqtt/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
DEFAULT_DISCOVERY = False

ATTR_DISCOVERY_HASH = "discovery_hash"
ATTR_DISCOVERY_PAYLOAD = "discovery_payload"
ATTR_DISCOVERY_TOPIC = "discovery_topic"
CONF_STATE_TOPIC = "state_topic"
PROTOCOL_311 = "3.1.1"
Expand Down
146 changes: 146 additions & 0 deletions homeassistant/components/mqtt/debug_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""Helper to handle a set of topics to subscribe to."""
from collections import deque
from functools import wraps
import logging
from typing import Any

from homeassistant.helpers.typing import HomeAssistantType

from .const import ATTR_DISCOVERY_PAYLOAD, ATTR_DISCOVERY_TOPIC
from .models import MessageCallbackType

_LOGGER = logging.getLogger(__name__)

DATA_MQTT_DEBUG_INFO = "mqtt_debug_info"
STORED_MESSAGES = 10


def log_messages(hass: HomeAssistantType, entity_id: str) -> MessageCallbackType:
"""Wrap an MQTT message callback to support message logging."""

def _log_message(msg):
"""Log message."""
debug_info = hass.data[DATA_MQTT_DEBUG_INFO]
messages = debug_info["entities"][entity_id]["topics"][msg.topic]
messages.append(msg.payload)

def _decorator(msg_callback: MessageCallbackType):
@wraps(msg_callback)
def wrapper(msg: Any) -> None:
"""Log message."""
_log_message(msg)
msg_callback(msg)

setattr(wrapper, "__entity_id", entity_id)
return wrapper

return _decorator


def add_topic(hass, message_callback, topic):
"""Prepare debug data for topic."""
entity_id = getattr(message_callback, "__entity_id", None)
if entity_id:
debug_info = hass.data.setdefault(
DATA_MQTT_DEBUG_INFO, {"entities": {}, "triggers": {}}
)
entity_info = debug_info["entities"].setdefault(
entity_id, {"topics": {}, "discovery_data": {}}
)
entity_info["topics"][topic] = deque([], STORED_MESSAGES)


def remove_topic(hass, message_callback, topic):
"""Remove debug data for topic."""
entity_id = getattr(message_callback, "__entity_id", None)
if entity_id and entity_id in hass.data[DATA_MQTT_DEBUG_INFO]["entities"]:
hass.data[DATA_MQTT_DEBUG_INFO]["entities"][entity_id]["topics"].pop(topic)


def add_entity_discovery_data(hass, discovery_data, entity_id):
"""Add discovery data."""
debug_info = hass.data.setdefault(
DATA_MQTT_DEBUG_INFO, {"entities": {}, "triggers": {}}
)
entity_info = debug_info["entities"].setdefault(
entity_id, {"topics": {}, "discovery_data": {}}
)
entity_info["discovery_data"] = discovery_data


def update_entity_discovery_data(hass, discovery_payload, entity_id):
"""Update discovery data."""
entity_info = hass.data[DATA_MQTT_DEBUG_INFO]["entities"][entity_id]
entity_info["discovery_data"][ATTR_DISCOVERY_PAYLOAD] = discovery_payload


def remove_entity_data(hass, entity_id):
"""Remove discovery data."""
hass.data[DATA_MQTT_DEBUG_INFO]["entities"].pop(entity_id)


def add_trigger_discovery_data(hass, discovery_hash, discovery_data, device_id):
"""Add discovery data."""
debug_info = hass.data.setdefault(
DATA_MQTT_DEBUG_INFO, {"entities": {}, "triggers": {}}
)
debug_info["triggers"][discovery_hash] = {
"device_id": device_id,
"discovery_data": discovery_data,
}


def update_trigger_discovery_data(hass, discovery_hash, discovery_payload):
"""Update discovery data."""
trigger_info = hass.data[DATA_MQTT_DEBUG_INFO]["triggers"][discovery_hash]
trigger_info["discovery_data"][ATTR_DISCOVERY_PAYLOAD] = discovery_payload


def remove_trigger_discovery_data(hass, discovery_hash):
"""Remove discovery data."""
hass.data[DATA_MQTT_DEBUG_INFO]["triggers"][discovery_hash]["discovery_data"] = None


async def info_for_device(hass, device_id):
"""Get debug info for a device."""
mqtt_info = {"entities": [], "triggers": []}
entity_registry = await hass.helpers.entity_registry.async_get_registry()

entries = hass.helpers.entity_registry.async_entries_for_device(
entity_registry, device_id
)
mqtt_debug_info = hass.data.setdefault(
DATA_MQTT_DEBUG_INFO, {"entities": {}, "triggers": {}}
)
for entry in entries:
if entry.entity_id not in mqtt_debug_info["entities"]:
continue

entity_info = mqtt_debug_info["entities"][entry.entity_id]
topics = [
{"topic": topic, "messages": list(messages)}
for topic, messages in entity_info["topics"].items()
]
discovery_data = {
"topic": entity_info["discovery_data"].get(ATTR_DISCOVERY_TOPIC, ""),
"payload": entity_info["discovery_data"].get(ATTR_DISCOVERY_PAYLOAD, ""),
}
mqtt_info["entities"].append(
{
"entity_id": entry.entity_id,
"topics": topics,
"discovery_data": discovery_data,
}
)

for trigger in mqtt_debug_info["triggers"].values():
if trigger["device_id"] != device_id:
continue

discovery_data = {
"topic": trigger["discovery_data"][ATTR_DISCOVERY_TOPIC],
"payload": trigger["discovery_data"][ATTR_DISCOVERY_PAYLOAD],
}
mqtt_info["triggers"].append({"discovery_data": discovery_data})

return mqtt_info
7 changes: 7 additions & 0 deletions homeassistant/components/mqtt/device_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
CONF_QOS,
DOMAIN,
cleanup_device_registry,
debug_info,
)
from .discovery import MQTT_DISCOVERY_UPDATED, clear_discovery_hash

Expand Down Expand Up @@ -183,6 +184,7 @@ async def discovery_update(payload):
if not payload:
# Empty payload: Remove trigger
_LOGGER.info("Removing trigger: %s", discovery_hash)
debug_info.remove_trigger_discovery_data(hass, discovery_hash)
if discovery_id in hass.data[DEVICE_TRIGGERS]:
device_trigger = hass.data[DEVICE_TRIGGERS][discovery_id]
device_trigger.detach_trigger()
Expand All @@ -192,6 +194,7 @@ async def discovery_update(payload):
else:
# Non-empty payload: Update trigger
_LOGGER.info("Updating trigger: %s", discovery_hash)
debug_info.update_trigger_discovery_data(hass, discovery_hash, payload)
config = TRIGGER_DISCOVERY_SCHEMA(payload)
await _update_device(hass, config_entry, config)
device_trigger = hass.data[DEVICE_TRIGGERS][discovery_id]
Expand Down Expand Up @@ -230,6 +233,9 @@ async def discovery_update(payload):
await hass.data[DEVICE_TRIGGERS][discovery_id].update_trigger(
config, discovery_hash, remove_signal
)
debug_info.add_trigger_discovery_data(
hass, discovery_hash, discovery_data, device.id
)


async def async_device_removed(hass: HomeAssistant, device_id: str):
Expand All @@ -241,6 +247,7 @@ async def async_device_removed(hass: HomeAssistant, device_id: str):
discovery_hash = device_trigger.discovery_data[ATTR_DISCOVERY_HASH]
discovery_topic = device_trigger.discovery_data[ATTR_DISCOVERY_TOPIC]

debug_info.remove_trigger_discovery_data(hass, discovery_hash)
device_trigger.detach_trigger()
clear_discovery_hash(hass, discovery_hash)
device_trigger.remove_signal()
Expand Down
3 changes: 2 additions & 1 deletion homeassistant/components/mqtt/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from homeassistant.helpers.typing import HomeAssistantType

from .abbreviations import ABBREVIATIONS, DEVICE_ABBREVIATIONS
from .const import ATTR_DISCOVERY_HASH, ATTR_DISCOVERY_TOPIC
from .const import ATTR_DISCOVERY_HASH, ATTR_DISCOVERY_PAYLOAD, ATTR_DISCOVERY_TOPIC

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -135,6 +135,7 @@ async def async_device_message_received(msg):
setattr(payload, "__configuration_source__", f"MQTT (topic: '{topic}')")
discovery_data = {
ATTR_DISCOVERY_HASH: discovery_hash,
ATTR_DISCOVERY_PAYLOAD: payload,
ATTR_DISCOVERY_TOPIC: topic,
}
setattr(payload, "discovery_data", discovery_data)
Expand Down
2 changes: 2 additions & 0 deletions homeassistant/components/mqtt/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
MqttEntityDeviceInfo,
subscription,
)
from .debug_info import log_messages
from .discovery import MQTT_DISCOVERY_NEW, clear_discovery_hash

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -137,6 +138,7 @@ async def _subscribe_topics(self):
template.hass = self.hass

@callback
@log_messages(self.hass, self.entity_id)
def message_received(msg):
"""Handle new MQTT messages."""
payload = msg.payload
Expand Down
10 changes: 10 additions & 0 deletions homeassistant/components/mqtt/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.loader import bind_hass

from . import debug_info
from .const import DEFAULT_QOS
from .models import MessageCallbackType

Expand All @@ -18,6 +19,7 @@
class EntitySubscription:
"""Class to hold data about an active entity topic subscription."""

hass = attr.ib(type=HomeAssistantType)
topic = attr.ib(type=str)
message_callback = attr.ib(type=MessageCallbackType)
unsubscribe_callback = attr.ib(type=Optional[Callable[[], None]])
Expand All @@ -31,11 +33,16 @@ async def resubscribe_if_necessary(self, hass, other):

if other is not None and other.unsubscribe_callback is not None:
other.unsubscribe_callback()
# Clear debug data if it exists
debug_info.remove_topic(self.hass, other.message_callback, other.topic)

if self.topic is None:
# We were asked to remove the subscription or not to create it
return

# Prepare debug data
debug_info.add_topic(self.hass, self.message_callback, self.topic)

self.unsubscribe_callback = await mqtt.async_subscribe(
hass, self.topic, self.message_callback, self.qos, self.encoding
)
Expand Down Expand Up @@ -77,6 +84,7 @@ async def async_subscribe_topics(
unsubscribe_callback=None,
qos=value.get("qos", DEFAULT_QOS),
encoding=value.get("encoding", "utf-8"),
hass=hass,
)
# Get the current subscription state
current = current_subscriptions.pop(key, None)
Expand All @@ -87,6 +95,8 @@ async def async_subscribe_topics(
for remaining in current_subscriptions.values():
if remaining.unsubscribe_callback is not None:
remaining.unsubscribe_callback()
# Clear debug data if it exists
debug_info.remove_topic(hass, remaining.message_callback, remaining.topic)

return new_state

Expand Down
Loading