Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ omit =
homeassistant/components/climate/sensibo.py
homeassistant/components/cover/garadget.py
homeassistant/components/cover/homematic.py
homeassistant/components/cover/knx.py
homeassistant/components/cover/myq.py
homeassistant/components/cover/opengarage.py
homeassistant/components/cover/rpi_gpio.py
Expand Down
2 changes: 2 additions & 0 deletions homeassistant/components/cover/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
'garage', # Garage door control
]

DEVICE_CLASSES_SCHEMA = vol.All(vol.Lower, vol.In(DEVICE_CLASSES))

SUPPORT_OPEN = 1
SUPPORT_CLOSE = 2
SUPPORT_SET_POSITION = 4
Expand Down
137 changes: 137 additions & 0 deletions homeassistant/components/cover/knx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""
Support for KNX covers.

Tested with an MDT roller shutter
http://www.mdt.de/download/MDT_THB_Jalousieaktor_02.pdf

Example configuration:

cover:
- platform: knx
updown_address: 9/0/0
stop_address: 9/0/1
setposition_address: 9/0/3
getposition_address: 9/0/4

"""
import logging

import voluptuous as vol

from homeassistant.components.cover import (
CoverDevice, PLATFORM_SCHEMA, ATTR_POSITION, DEVICE_CLASSES_SCHEMA
)
from homeassistant.components.knx import (KNXConfig, KNXMultiAddressDevice)
from homeassistant.const import (CONF_NAME, CONF_DEVICE_CLASS)
import homeassistant.helpers.config_validation as cv

_LOGGER = logging.getLogger(__name__)

CONF_UPDOWN = 'updown_address'
CONF_STOP = 'stop_address'
CONF_SETPOSITION_ADDRESS = 'setposition_address'
CONF_GETPOSITION_ADDRESS = 'getposition_address'

DEFAULT_NAME = 'KNX Cover'
DEPENDENCIES = ['knx']

PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_UPDOWN): cv.string,
vol.Required(CONF_STOP): cv.string,
vol.Optional(CONF_SETPOSITION_ADDRESS): cv.string,
vol.Optional(CONF_GETPOSITION_ADDRESS): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA
})


def setup_platform(hass, config, add_devices, discovery_info=None):
"""Create and add an entity based on the configuration."""
add_devices([KNXCover(hass, KNXConfig(config))])


class KNXCover(KNXMultiAddressDevice, CoverDevice):
"""Representation of a KNX cover. e.g. a rollershutter."""

def __init__(self, hass, config):
"""Initialize the cover."""
KNXMultiAddressDevice.__init__(
self, hass, config,
['updown', 'stop'], # required
optional=['setposition', 'getposition']
)
self._device_class = config.config.get(CONF_DEVICE_CLASS)
self._hass = hass
self._current_pos = None
self._target_pos = None

@property
def should_poll(self):
"""Polling is needed for the KNX cover."""
return True

@property
def is_closed(self):
"""Return if the cover is closed."""
if self.current_cover_position is not None:
if self.current_cover_position > 0:
return False
else:
return True

@property
def current_cover_position(self):
"""Return current position of cover.

None is unknown, 0 is closed, 100 is fully open.
"""
return self._current_pos

@property
def target_position(self):
"""Return the position we are trying to reach: 0 - 100."""
return self._target_pos

def set_cover_position(self, **kwargs):
"""Set new target position."""
position = kwargs.get(ATTR_POSITION)
if position is None:
return

self._target_pos = position
self.set_percentage('setposition', position)
_LOGGER.debug(
"%s: Set target position to %d",
self.name, position
)

def update(self):
"""Update device state."""
super().update()
value = self.get_percentage('getposition')
if value is not None:
self._current_pos = value
_LOGGER.debug(
"%s: position = %d",
self.name, value
)

def open_cover(self, **kwargs):
"""Open the cover."""
_LOGGER.debug("%s: open: updown = 0", self.name)
self.set_int_value('updown', 0)

def close_cover(self, **kwargs):
"""Close the cover."""
_LOGGER.debug("%s: open: updown = 1", self.name)
self.set_int_value('updown', 1)

def stop_cover(self, **kwargs):
"""Stop the cover movement."""
_LOGGER.debug("%s: stop: stop = 1", self.name)
self.set_int_value('stop', 1)

@property
def device_class(self):
"""Return the class of this device, from component DEVICE_CLASSES."""
return self._device_class
55 changes: 50 additions & 5 deletions homeassistant/components/knx.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,6 @@ class KNXMultiAddressDevice(Entity):
to be controlled by multiple group addresses.
"""

names = {}
values = {}

def __init__(self, hass, config, required, optional=None):
"""Initialize the device.

Expand All @@ -226,28 +223,34 @@ def __init__(self, hass, config, required, optional=None):
"""
from knxip.core import parse_group_address, KNXException

self.names = {}
self.values = {}

self._config = config
self._state = False
self._data = None
_LOGGER.debug("Initalizing KNX multi address device")

settings = self._config.config
# parse required addresses
for name in required:
_LOGGER.info(name)
paramname = '{}{}'.format(name, '_address')
addr = self._config.config.get(paramname)
addr = settings.get(paramname)
if addr is None:
_LOGGER.exception(
"Required KNX group address %s missing", paramname)
raise KNXException(
"Group address for %s missing in configuration", paramname)
_LOGGER.debug("%s: %s=%s", settings.get('name'), paramname, addr)
addr = parse_group_address(addr)
self.names[addr] = name

# parse optional addresses
for name in optional:
paramname = '{}{}'.format(name, '_address')
addr = self._config.config.get(paramname)
addr = settings.get(paramname)
_LOGGER.debug("%s: %s=%s", settings.get('name'), paramname, addr)
if addr:
try:
addr = parse_group_address(addr)
Expand Down Expand Up @@ -285,6 +288,48 @@ def has_attribute(self, name):
return True
return False

def set_percentage(self, name, percentage):
"""Set a percentage in knx for a given attribute.

DPT_Scaling / DPT 5.001 is a single byte scaled percentage
"""
percentage = abs(percentage) # only accept positive values
scaled_value = percentage * 255 / 100
value = min(255, scaled_value)
self.set_int_value(name, value)

def get_percentage(self, name):
"""Get a percentage from knx for a given attribute.

DPT_Scaling / DPT 5.001 is a single byte scaled percentage
"""
value = self.get_int_value(name)
percentage = round(value * 100 / 255)
return percentage

def set_int_value(self, name, value, num_bytes=1):
"""Set an integer value for a given attribute."""
# KNX packets are big endian
value = round(value) # only accept integers
b_value = value.to_bytes(num_bytes, byteorder='big')
self.set_value(name, list(b_value))

def get_int_value(self, name):
"""Get an integer value for a given attribute."""
# KNX packets are big endian
summed_value = 0
raw_value = self.value(name)
try:
# convert raw value in bytes
for val in raw_value:
summed_value *= 256
summed_value += val
except TypeError:
# pknx returns a non-iterable type for unsuccessful reads
pass

return summed_value

def value(self, name):
"""Return the value to a given named attribute."""
from knxip.core import KNXException
Expand Down