Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions homeassistant/components/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,30 @@ def state_changes_during_period(hass, start_time, end_time=None,
return states_to_json(hass, states, start_time, entity_ids)


def get_last_state_changes(hass, number_of_states, entity_id):
"""Return the last number_of_states."""
from homeassistant.components.recorder.models import States

start_time = dt_util.utcnow()

with session_scope(hass=hass) as session:
query = session.query(States).filter(
(States.last_changed == States.last_updated))

if entity_id is not None:
query = query.filter_by(entity_id=entity_id.lower())

entity_ids = [entity_id] if entity_id is not None else None

states = execute(
query.order_by(States.last_updated.desc()).limit(number_of_states))

return states_to_json(hass, reversed(states),
start_time,
entity_ids,
include_start_time_state=False)


def get_states(hass, utc_point_in_time, entity_ids=None, run=None,
filters=None):
"""Return the states at a specific point in time."""
Expand Down
158 changes: 122 additions & 36 deletions homeassistant/components/sensor/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import statistics
from collections import deque, Counter
from numbers import Number
from functools import partial
from copy import copy
from datetime import timedelta

import voluptuous as vol

Expand All @@ -20,6 +23,7 @@
from homeassistant.util.decorator import Registry
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import async_track_state_change
import homeassistant.components.history as history
import homeassistant.util.dt as dt_util

_LOGGER = logging.getLogger(__name__)
Expand All @@ -40,6 +44,9 @@

TIME_SMA_LAST = 'last'

WINDOW_SIZE_UNIT_NUMBER_EVENTS = 1
WINDOW_SIZE_UNIT_TIME = 2

DEFAULT_WINDOW_SIZE = 1
DEFAULT_PRECISION = 2
DEFAULT_FILTER_RADIUS = 2.0
Expand Down Expand Up @@ -123,21 +130,22 @@ def __init__(self, name, entity_id, filters):
async def async_added_to_hass(self):
"""Register callbacks."""
@callback
def filter_sensor_state_listener(entity, old_state, new_state):
def filter_sensor_state_listener(entity, old_state, new_state,
update_ha=True):
"""Handle device state changes."""
if new_state.state in [STATE_UNKNOWN, STATE_UNAVAILABLE]:
return

temp_state = new_state.state
temp_state = new_state

try:
for filt in self._filters:
filtered_state = filt.filter_state(temp_state)
filtered_state = filt.filter_state(copy(temp_state))
_LOGGER.debug("%s(%s=%s) -> %s", filt.name,
self._entity,
temp_state,
temp_state.state,
"skip" if filt.skip_processing else
filtered_state)
filtered_state.state)
if filt.skip_processing:
return
temp_state = filtered_state
Expand All @@ -146,7 +154,7 @@ def filter_sensor_state_listener(entity, old_state, new_state):
self._state)
return

self._state = temp_state
self._state = temp_state.state

if self._icon is None:
self._icon = new_state.attributes.get(
Expand All @@ -156,7 +164,50 @@ def filter_sensor_state_listener(entity, old_state, new_state):
self._unit_of_measurement = new_state.attributes.get(
ATTR_UNIT_OF_MEASUREMENT)

self.async_schedule_update_ha_state()
if update_ha:
self.async_schedule_update_ha_state()

if 'recorder' in self.hass.config.components:
history_list = []
largest_window_items = 0
largest_window_time = timedelta(0)

# Determine the largest window_size by type
for filt in self._filters:
if filt.window_unit == WINDOW_SIZE_UNIT_NUMBER_EVENTS\
and largest_window_items < filt.window_size:
largest_window_items = filt.window_size
elif filt.window_unit == WINDOW_SIZE_UNIT_TIME\
and largest_window_time < filt.window_size:
largest_window_time = filt.window_size

# Retrieve the largest window_size of each type
if largest_window_items > 0:
filter_history = await self.hass.async_add_job(partial(
history.get_last_state_changes, self.hass,
largest_window_items, entity_id=self._entity))
history_list.extend(
[state for state in filter_history[self._entity]])
if largest_window_time > timedelta(seconds=0):
start = dt_util.utcnow() - largest_window_time
filter_history = await self.hass.async_add_job(partial(
history.state_changes_during_period, self.hass,
start, entity_id=self._entity))
history_list.extend(
[state for state in filter_history[self._entity]
if state not in history_list])

# Sort the window states
history_list = sorted(history_list, key=lambda s: s.last_updated)
_LOGGER.debug("Loading from history: %s",
[(s.state, s.last_updated) for s in history_list])

# Replay history through the filter chain
prev_state = None
for state in history_list:
filter_sensor_state_listener(
self._entity, prev_state, state, False)
prev_state = state

async_track_state_change(
self.hass, self._entity, filter_sensor_state_listener)
Expand Down Expand Up @@ -195,6 +246,31 @@ def device_state_attributes(self):
return state_attr


class FilterState(object):
"""State abstraction for filter usage."""

def __init__(self, state):
"""Initialize with HA State object."""
self.timestamp = state.last_updated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why call it timestamp instead of keeping the name last_updated? That will cause less confusion.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When working with time series, the name of the dimension is always timestamp. This is the correct name in the scientific area.

What causes confusion is last_updated vs last_changed. With the FilterState class I intended to isolate filter algorithms as much as possible from the guts of HA core.

try:
self.state = float(state.state)
except ValueError:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dangerous. This means that if a non float state comes in, all filters will blow up because they are still assuming it's a float?

Copy link
Copy Markdown
Contributor Author

@dgomes dgomes Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already had this discussion :)

Some filters like Throttle can work with non Numeric states.

I simply moved the conversion to float from each of the numeric filters into the parent class filter. This piece of code was just repeated over 3 different filters and missing from the Throttle filter.

self.state = state.state

def set_precision(self, precision):
"""Set precision of Number based states."""
if isinstance(self.state, Number):
self.state = round(float(self.state), precision)

def __str__(self):
"""Return state as the string representation of FilterState."""
return str(self.state)

def __repr__(self):
"""Return timestamp and state as the representation of FilterState."""
return "{} : {}".format(self.timestamp, self.state)


class Filter(object):
"""Filter skeleton.

Expand All @@ -207,11 +283,22 @@ class Filter(object):

def __init__(self, name, window_size=1, precision=None, entity=None):
"""Initialize common attributes."""
self.states = deque(maxlen=window_size)
if isinstance(window_size, int):
self.states = deque(maxlen=window_size)
self.window_unit = WINDOW_SIZE_UNIT_NUMBER_EVENTS
else:
self.states = deque(maxlen=0)
self.window_unit = WINDOW_SIZE_UNIT_TIME
self.precision = precision
self._name = name
self._entity = entity
self._skip_processing = False
self._window_size = window_size

@property
def window_size(self):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should have 2 vars. window size and time window ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be a very bad decision, there is only 1 dimension (window size) that can be described using different units (time vs #events). You don't keep 2 temperatures F and C...

The issue at hand is that the conversion between these units is non linear and requires a query (converting the #events into time) which is quite expensive.

Copy link
Copy Markdown
Member

@balloob balloob Mar 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For temperature we actually have another variable (unit_of_measurement) to describe the unit of the attribute. So if you want to make the comparison with temperature, let's add a variable that describes the type.

Copy link
Copy Markdown
Contributor Author

@dgomes dgomes Mar 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't really see the need for it, since we can already check that (is actually used throughout the code):

isinstance(filt.window_size, int) # #events
isinstance(filt.window_size, timedelta) # time

but I'm fine adding such method:

Would this work for you?

def window_size_unit(self):
  if isinstance(self.window_size, int):
    return WINDOW_SIZE_UNIT_TIME
  else:
    return WINDOW_SIZE_UNIT_NUMBER_EVENTS

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not assign it as an instance variable inside the if;else block inside the constructor of Filter.

"""Return window size."""
return self._window_size

@property
def name(self):
Expand All @@ -229,11 +316,11 @@ def _filter_state(self, new_state):

def filter_state(self, new_state):
"""Implement a common interface for filters."""
filtered = self._filter_state(new_state)
if isinstance(filtered, Number):
filtered = round(float(filtered), self.precision)
self.states.append(filtered)
return filtered
filtered = self._filter_state(FilterState(new_state))
filtered.set_precision(self.precision)
self.states.append(copy(filtered))
new_state.state = filtered.state
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is new_state here? is that a state object from Home Assistant?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in that case you're not allowed to assign to it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it is a FilterState object.

return new_state


@FILTERS.register(FILTER_NAME_OUTLIER)
Expand All @@ -254,11 +341,10 @@ def __init__(self, window_size, precision, entity, radius):

def _filter_state(self, new_state):
"""Implement the outlier filter."""
new_state = float(new_state)

if (self.states and
abs(new_state - statistics.median(self.states))
> self._radius):
abs(new_state.state -
statistics.median([s.state for s in self.states])) >
self._radius):

self._stats_internal['erasures'] += 1

Expand All @@ -284,16 +370,15 @@ def __init__(self, window_size, precision, entity, time_constant):

def _filter_state(self, new_state):
"""Implement the low pass filter."""
new_state = float(new_state)

if not self.states:
return new_state

new_weight = 1.0 / self._time_constant
prev_weight = 1.0 - new_weight
filtered = prev_weight * self.states[-1] + new_weight * new_state
new_state.state = prev_weight * self.states[-1].state +\
new_weight * new_state.state

return filtered
return new_state


@FILTERS.register(FILTER_NAME_TIME_SMA)
Expand All @@ -308,35 +393,36 @@ class TimeSMAFilter(Filter):

def __init__(self, window_size, precision, entity, type):
"""Initialize Filter."""
super().__init__(FILTER_NAME_TIME_SMA, 0, precision, entity)
self._time_window = int(window_size.total_seconds())
super().__init__(FILTER_NAME_TIME_SMA, window_size, precision, entity)
self._time_window = window_size
self.last_leak = None
self.queue = deque()

def _leak(self, now):
def _leak(self, left_boundary):
"""Remove timeouted elements."""
while self.queue:
timestamp, _ = self.queue[0]
if timestamp + self._time_window <= now:
if self.queue[0].timestamp + self._time_window <= left_boundary:
self.last_leak = self.queue.popleft()
else:
return

def _filter_state(self, new_state):
now = int(dt_util.utcnow().timestamp())
"""Implement the Simple Moving Average filter."""
self._leak(new_state.timestamp)
self.queue.append(copy(new_state))

self._leak(now)
self.queue.append((now, float(new_state)))
moving_sum = 0
start = now - self._time_window
_, prev_val = self.last_leak or (0, float(new_state))
start = new_state.timestamp - self._time_window
prev_state = self.last_leak or self.queue[0]
for state in self.queue:
moving_sum += (state.timestamp-start).total_seconds()\
* prev_state.state
start = state.timestamp
prev_state = state

for timestamp, val in self.queue:
moving_sum += (timestamp-start)*prev_val
start, prev_val = timestamp, val
moving_sum += (now-start)*prev_val
new_state.state = moving_sum / self._time_window.total_seconds()

return moving_sum/self._time_window
return new_state


@FILTERS.register(FILTER_NAME_THROTTLE)
Expand Down
Loading