Skip to content
179 changes: 116 additions & 63 deletions homeassistant/components/telegram_bot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,25 @@
import os

import requests
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
import voluptuous as vol

from homeassistant.components.notify import (
ATTR_MESSAGE, ATTR_TITLE, ATTR_DATA)
from homeassistant.config import load_yaml_config_file
from homeassistant.const import (
CONF_PLATFORM, CONF_API_KEY, CONF_TIMEOUT, ATTR_LATITUDE, ATTR_LONGITUDE)
CONF_PLATFORM, CONF_API_KEY, CONF_TIMEOUT, ATTR_LATITUDE, ATTR_LONGITUDE,
HTTP_DIGEST_AUTHENTICATION)
import homeassistant.helpers.config_validation as cv
from homeassistant.exceptions import TemplateError
from homeassistant.setup import async_prepare_setup_platform

REQUIREMENTS = ['python-telegram-bot==6.0.1']

_LOGGER = logging.getLogger(__name__)

ATTR_ARGS = 'args'
ATTR_AUTHENTICATION = 'authentication'
ATTR_CALLBACK_QUERY = 'callback_query'
ATTR_CALLBACK_QUERY_ID = 'callback_query_id'
ATTR_CAPTION = 'caption'
Expand Down Expand Up @@ -104,16 +108,17 @@
SERVICE_SEND_PHOTO = 'send_photo'
SERVICE_SEND_DOCUMENT = 'send_document'
SERVICE_SCHEMA_SEND_FILE = BASE_SERVICE_SCHEMA.extend({
vol.Optional(ATTR_URL): cv.string,
vol.Optional(ATTR_FILE): cv.string,
vol.Optional(ATTR_CAPTION): cv.string,
vol.Optional(ATTR_URL): cv.template,
vol.Optional(ATTR_FILE): cv.template,
vol.Optional(ATTR_CAPTION): cv.template,
vol.Optional(ATTR_USERNAME): cv.string,
vol.Optional(ATTR_PASSWORD): cv.string,
vol.Optional(ATTR_AUTHENTICATION): cv.string,
})
SERVICE_SEND_LOCATION = 'send_location'
SERVICE_SCHEMA_SEND_LOCATION = BASE_SERVICE_SCHEMA.extend({
vol.Required(ATTR_LONGITUDE): float,
vol.Required(ATTR_LATITUDE): float,
vol.Required(ATTR_LONGITUDE): cv.template,
vol.Required(ATTR_LATITUDE): cv.template,
})
SERVICE_EDIT_MESSAGE = 'edit_message'
SERVICE_SCHEMA_EDIT_MESSAGE = SERVICE_SCHEMA_SEND_MESSAGE.extend({
Expand All @@ -124,7 +129,7 @@
SERVICE_SCHEMA_EDIT_CAPTION = vol.Schema({
vol.Required(ATTR_MESSAGEID): vol.Any(cv.positive_int, cv.string),
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
vol.Required(ATTR_CAPTION): cv.string,
vol.Required(ATTR_CAPTION): cv.template,
vol.Optional(ATTR_KEYBOARD_INLINE): cv.ensure_list,
}, extra=vol.ALLOW_EXTRA)
SERVICE_EDIT_REPLYMARKUP = 'edit_replymarkup'
Expand All @@ -136,7 +141,7 @@
SERVICE_ANSWER_CALLBACK_QUERY = 'answer_callback_query'
SERVICE_SCHEMA_ANSWER_CALLBACK_QUERY = vol.Schema({
vol.Required(ATTR_MESSAGE): cv.template,
vol.Required(ATTR_CALLBACK_QUERY_ID): cv.positive_int,
vol.Required(ATTR_CALLBACK_QUERY_ID): vol.Coerce(int),
vol.Optional(ATTR_SHOW_ALERT): cv.boolean,
}, extra=vol.ALLOW_EXTRA)

Expand All @@ -152,24 +157,42 @@
}


def load_data(url=None, file=None, username=None, password=None):
def load_data(url=None, filepath=None,
username=None, password=None,
authentication=None, num_retries=5):
"""Load photo/document into ByteIO/File container from a source."""
try:
if url is not None:
# Load photo from URL
params = {"timeout": 15}
if username is not None and password is not None:
req = requests.get(url, auth=(username, password), timeout=15)
else:
req = requests.get(url, timeout=15)
return io.BytesIO(req.content)

elif file is not None:
if authentication == HTTP_DIGEST_AUTHENTICATION:
params["auth"] = HTTPDigestAuth(username, password)
else:
params["auth"] = HTTPBasicAuth(username, password)
retry_num = 0
while retry_num < num_retries:
req = requests.get(url, **params)
if not req.ok:
_LOGGER.warning("Status code %s (retry #%s) loading %s.",
req.status_code, retry_num + 1, url)
else:
data = io.BytesIO(req.content)
if data.read():
data.seek(0)
return data
_LOGGER.warning("Empty data (retry #%s) in %s).",
retry_num + 1, url)
retry_num += 1
_LOGGER.warning("Can't load photo in %s after %s retries.",
url, retry_num)
elif filepath is not None:
# Load photo from file
return open(file, "rb")
return open(filepath, "rb")
else:
_LOGGER.warning("Can't load photo. No photo found in params!")

except OSError as error:
except (OSError, TypeError) as error:
_LOGGER.error("Can't load photo into ByteIO: %s", error)

return None
Expand Down Expand Up @@ -219,13 +242,24 @@ def async_send_telegram_message(service):
def _render_template_attr(data, attribute):
attribute_templ = data.get(attribute)
if attribute_templ:
attribute_templ.hass = hass
data[attribute] = attribute_templ.async_render()
if any([isinstance(attribute_templ, vtype)
for vtype in [float, int, str]]):
data[attribute] = attribute_templ
else:
attribute_templ.hass = hass
try:
data[attribute] = attribute_templ.async_render()
except TemplateError as exc:
_LOGGER.error(
"TemplateError in %s: %s -> %s",
attribute, attribute_templ.template, exc)
data[attribute] = attribute_templ.template

msgtype = service.service
kwargs = dict(service.data)
_render_template_attr(kwargs, ATTR_MESSAGE)
_render_template_attr(kwargs, ATTR_TITLE)
for attribute in [ATTR_MESSAGE, ATTR_TITLE, ATTR_URL, ATTR_FILE,
ATTR_CAPTION, ATTR_LONGITUDE, ATTR_LATITUDE]:
_render_template_attr(kwargs, attribute)
_LOGGER.debug("NEW telegram_message %s: %s", msgtype, kwargs)

if msgtype == SERVICE_SEND_MESSAGE:
Expand Down Expand Up @@ -296,48 +330,56 @@ def _get_msg_ids(self, msg_data, chat_id):
return message_id, inline_message_id

def _get_target_chat_ids(self, target):
"""Validate chat_id targets or return default target (fist defined).
"""Validate chat_id targets or return default target (first).

:param target: optional list of strings or ints (['12234'] or [12234])
:param target: optional list of integers ([12234, -12345])
:return list of chat_id targets (integers)
"""
if target is not None:
if isinstance(target, int):
if target in self.allowed_chat_ids:
return [target]
_LOGGER.warning("BAD TARGET %s, using default: %s",
target, self._default_user)
else:
try:
chat_ids = [int(t) for t in target
if int(t) in self.allowed_chat_ids]
if len(chat_ids) > 0:
return chat_ids
_LOGGER.warning("ALL BAD TARGETS: %s", target)
except (ValueError, TypeError):
_LOGGER.warning("BAD TARGET DATA %s, using default: %s",
target, self._default_user)
target = [target]
chat_ids = [t for t in target if t in self.allowed_chat_ids]
if chat_ids:
return chat_ids
_LOGGER.warning("Unallowed targets: %s, using default: %s",
target, self._default_user)
return [self._default_user]

def _get_msg_kwargs(self, data):
"""Get parameters in message data kwargs."""
def _make_row_of_kb(row_keyboard):
"""Make a list of InlineKeyboardButtons from a list of tuples.

:param row_keyboard: [(text_b1, data_callback_b1),
(text_b2, data_callback_b2), ...]
def _make_row_inline_keyboard(row_keyboard):
"""Make a list of InlineKeyboardButtons.

It can accept:
- a list of tuples like:
`[(text_b1, data_callback_b1),
(text_b2, data_callback_b2), ...]
- a string like: `/cmd1, /cmd2, /cmd3`
- or a string like: `text_b1:/cmd1, text_b2:/cmd2`
"""
from telegram import InlineKeyboardButton
buttons = []
if isinstance(row_keyboard, str):
return [InlineKeyboardButton(
key.strip()[1:].upper(), callback_data=key)
for key in row_keyboard.split(",")]
for key in row_keyboard.split(","):
if ':/' in key:
# commands like: 'Label:/cmd' become ('Label', '/cmd')
label = key.split(':/')[0]
command = key[len(label) + 1:]
buttons.append(
InlineKeyboardButton(label, callback_data=command))
else:
# commands like: '/cmd' become ('CMD', '/cmd')
label = key.strip()[1:].upper()
buttons.append(
InlineKeyboardButton(label, callback_data=key))
elif isinstance(row_keyboard, list):
return [InlineKeyboardButton(
text_btn, callback_data=data_btn)
for text_btn, data_btn in row_keyboard]
for entry in row_keyboard:
text_btn, data_btn = entry
buttons.append(
InlineKeyboardButton(text_btn, callback_data=data_btn))
else:
raise ValueError(str(row_keyboard))
return buttons

# Defaults
params = {
Expand Down Expand Up @@ -372,7 +414,7 @@ def _make_row_of_kb(row_keyboard):
keys = data.get(ATTR_KEYBOARD_INLINE)
keys = keys if isinstance(keys, list) else [keys]
params[ATTR_REPLYMARKUP] = InlineKeyboardMarkup(
[_make_row_of_kb(row) for row in keys])
[_make_row_inline_keyboard(row) for row in keys])
return params

def _send_msg(self, func_send, msg_error, *args_rep, **kwargs_rep):
Expand Down Expand Up @@ -446,20 +488,26 @@ def answer_callback_query(self, message, callback_query_id,

def send_file(self, is_photo=True, target=None, **kwargs):
"""Send a photo or a document."""
file = load_data(
params = self._get_msg_kwargs(kwargs)
caption = kwargs.get(ATTR_CAPTION)
func_send = self.bot.sendPhoto if is_photo else self.bot.sendDocument
file_content = load_data(
url=kwargs.get(ATTR_URL),
file=kwargs.get(ATTR_FILE),
filepath=kwargs.get(ATTR_FILE),
username=kwargs.get(ATTR_USERNAME),
password=kwargs.get(ATTR_PASSWORD),
authentication=kwargs.get(ATTR_AUTHENTICATION),
)
params = self._get_msg_kwargs(kwargs)
caption = kwargs.get(ATTR_CAPTION)
func_send = self.bot.sendPhoto if is_photo else self.bot.sendDocument
for chat_id in self._get_target_chat_ids(target):
_LOGGER.debug("send file %s to chat_id %s. Caption: %s.",
file, chat_id, caption)
self._send_msg(func_send, "Error sending file",
chat_id, file, caption=caption, **params)
if file_content:
for chat_id in self._get_target_chat_ids(target):
_LOGGER.debug("send file to chat_id %s. Caption: %s.",
chat_id, caption)
self._send_msg(func_send, "Error sending file",
chat_id, io.BytesIO(file_content.read()),
caption=caption, **params)
file_content.seek(0)
else:
_LOGGER.error("Can't send file with kwargs: %s", kwargs)

def send_location(self, latitude, longitude, target=None, **kwargs):
"""Send a location."""
Expand Down Expand Up @@ -495,18 +543,23 @@ def _get_message_data(self, msg_data):
_LOGGER.error("Incoming message does not have required data (%s)",
msg_data)
return False, None
if msg_data['from'].get('id') not in self.allowed_chat_ids \
or msg_data['chat'].get('id') not in self.allowed_chat_ids:

if (msg_data['from'].get('id') not in self.allowed_chat_ids or
('chat' in msg_data and
msg_data['chat'].get('id') not in self.allowed_chat_ids)):
# Origin is not allowed.
_LOGGER.error("Incoming message is not allowed (%s)", msg_data)
return True, None

return True, {
data = {
ATTR_USER_ID: msg_data['from']['id'],
ATTR_CHAT_ID: msg_data['chat']['id'],
ATTR_FROM_FIRST: msg_data['from']['first_name'],
ATTR_FROM_LAST: msg_data['from']['last_name']
}
if 'chat' in msg_data:
data[ATTR_CHAT_ID] = msg_data['chat']['id']

return True, data

def process_message(self, data):
"""Check for basic message rules and fire an event if message is ok."""
Expand Down
2 changes: 1 addition & 1 deletion homeassistant/components/telegram_bot/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ send_message:

inline_keyboard:
description: List of rows of commands, comma-separated, to make a custom inline keyboard with buttons with asociated callback data.
example: '["/button1, /button2", "/button3"] or [[["Text button1", "/button1"], ["Text button2", "/button2"]], [["Text button3", "/button3"]]]'
example: '["/button1, /button2", "/button3"] or ["Text button1:/button1, Text button2:/button2", "Text button3:/button3"] or [[["Text button1", "/button1"], ["Text button2", "/button2"]], [["Text button3", "/button3"]]]'

send_photo:
description: Send a photo
Expand Down