Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions homeassistant/components/recorder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@
SERVICE_PURGE = 'purge'

ATTR_KEEP_DAYS = 'keep_days'
ATTR_REPACK = 'repack'

SERVICE_PURGE_SCHEMA = vol.Schema({
vol.Required(ATTR_KEEP_DAYS):
vol.All(vol.Coerce(int), vol.Range(min=0))
vol.Optional(ATTR_KEEP_DAYS):
vol.All(vol.Coerce(int), vol.Range(min=0)),
vol.Optional(ATTR_REPACK, default=False): cv.boolean
})

DEFAULT_URL = 'sqlite:///{hass_config_path}'
Expand Down Expand Up @@ -76,7 +78,7 @@

CONFIG_SCHEMA = vol.Schema({
DOMAIN: FILTER_SCHEMA.extend({
vol.Optional(CONF_PURGE_KEEP_DAYS):
vol.Optional(CONF_PURGE_KEEP_DAYS, default=10):
vol.All(vol.Coerce(int), vol.Range(min=1)),
vol.Optional(CONF_PURGE_INTERVAL, default=1):
vol.All(vol.Coerce(int), vol.Range(min=0)),
Expand Down Expand Up @@ -122,12 +124,6 @@ def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
keep_days = conf.get(CONF_PURGE_KEEP_DAYS)
purge_interval = conf.get(CONF_PURGE_INTERVAL)

if keep_days is None and purge_interval != 0:
_LOGGER.warning(
"From version 0.64.0 the 'recorder' component will by default "
"purge data older than 10 days. To keep data longer you must "
"configure 'purge_keep_days' or 'purge_interval'.")

db_url = conf.get(CONF_DB_URL, None)
if not db_url:
db_url = DEFAULT_URL.format(
Expand All @@ -144,7 +140,7 @@ def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
@asyncio.coroutine
def async_handle_purge_service(service):
"""Handle calls to the purge service."""
instance.do_adhoc_purge(service.data[ATTR_KEEP_DAYS])
instance.do_adhoc_purge(**service.data)

hass.services.async_register(
DOMAIN, SERVICE_PURGE, async_handle_purge_service,
Expand All @@ -153,7 +149,7 @@ def async_handle_purge_service(service):
return (yield from instance.async_db_ready)


PurgeTask = namedtuple('PurgeTask', ['keep_days'])
PurgeTask = namedtuple('PurgeTask', ['keep_days', 'repack'])


class Recorder(threading.Thread):
Expand Down Expand Up @@ -188,10 +184,12 @@ def async_initialize(self):
"""Initialize the recorder."""
self.hass.bus.async_listen(MATCH_ALL, self.event_listener)

def do_adhoc_purge(self, keep_days):
def do_adhoc_purge(self, **kwargs):
"""Trigger an adhoc purge retaining keep_days worth of data."""
if keep_days is not None:
self.queue.put(PurgeTask(keep_days))
keep_days = kwargs.get(ATTR_KEEP_DAYS, self.keep_days)
repack = kwargs.get(ATTR_REPACK)

self.queue.put(PurgeTask(keep_days, repack))

def run(self):
"""Start processing events to save."""
Expand Down Expand Up @@ -261,7 +259,8 @@ def notify_hass_started(event):
@callback
def async_purge(now):
"""Trigger the purge and schedule the next run."""
self.queue.put(PurgeTask(self.keep_days))
self.queue.put(
PurgeTask(self.keep_days, repack=not self.did_vacuum))
self.hass.helpers.event.async_track_point_in_time(
async_purge, now + timedelta(days=self.purge_interval))

Expand Down Expand Up @@ -294,7 +293,7 @@ def async_purge(now):
self.queue.task_done()
return
elif isinstance(event, PurgeTask):
purge.purge_old_data(self, event.keep_days)
purge.purge_old_data(self, event.keep_days, event.repack)
self.queue.task_done()
continue
elif event.event_type == EVENT_TIME_CHANGED:
Expand Down
7 changes: 4 additions & 3 deletions homeassistant/components/recorder/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
_LOGGER = logging.getLogger(__name__)


def purge_old_data(instance, purge_days):
def purge_old_data(instance, purge_days, repack):
"""Purge events and states older than purge_days ago."""
from .models import States, Events
from sqlalchemy import orm
from sqlalchemy.sql import exists

purge_before = dt_util.utcnow() - timedelta(days=purge_days)
_LOGGER.debug("Purging events before %s", purge_before)

with session_scope(session=instance.get_session()) as session:
# For each entity, the most recent state is protected from deletion
Expand Down Expand Up @@ -55,10 +56,10 @@ def purge_old_data(instance, purge_days):

# Execute sqlite vacuum command to free up space on disk
_LOGGER.debug("DB engine driver: %s", instance.engine.driver)
if instance.engine.driver == 'pysqlite' and not instance.did_vacuum:
if repack and instance.engine.driver == 'pysqlite':
from sqlalchemy import exc

_LOGGER.info("Vacuuming SQLite to free space")
_LOGGER.debug("Vacuuming SQLite to free space")
try:
instance.engine.execute("VACUUM")
instance.did_vacuum = True
Expand Down
3 changes: 3 additions & 0 deletions homeassistant/components/recorder/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ purge:
keep_days:
description: Number of history days to keep in database after purge. Value >= 0.
example: 2
repack:
description: Attempt to save disk space by rewriting the entire database file.
example: true
4 changes: 0 additions & 4 deletions homeassistant/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,6 @@
# Enables support for tracking state changes over time
history:

# Tracked history is kept for 10 days
recorder:
purge_keep_days: 10

# View all events in a logbook
logbook:

Expand Down
66 changes: 41 additions & 25 deletions tests/components/recorder/test_purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ class TestRecorderPurge(unittest.TestCase):

def setUp(self): # pylint: disable=invalid-name
"""Setup things to be run when tests are started."""
config = {'purge_keep_days': 4, 'purge_interval': 2}
self.hass = get_test_home_assistant()
init_recorder_component(self.hass, config)
init_recorder_component(self.hass)
self.hass.start()

def tearDown(self): # pylint: disable=invalid-name
Expand All @@ -29,14 +28,18 @@ def _add_test_states(self):
"""Add multiple states to the db for testing."""
now = datetime.now()
five_days_ago = now - timedelta(days=5)
eleven_days_ago = now - timedelta(days=11)
attributes = {'test_attr': 5, 'test_attr_10': 'nice'}

self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done()

with recorder.session_scope(hass=self.hass) as session:
for event_id in range(5):
if event_id < 3:
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
state = 'autopurgeme'
elif event_id < 4:
timestamp = five_days_ago
state = 'purgeme'
else:
Expand Down Expand Up @@ -65,24 +68,28 @@ def _add_test_states(self):
domain='sensor',
state='iamprotected',
attributes=json.dumps(attributes),
last_changed=five_days_ago,
last_updated=five_days_ago,
created=five_days_ago,
last_changed=eleven_days_ago,
last_updated=eleven_days_ago,
created=eleven_days_ago,
event_id=protected_event_id
))

def _add_test_events(self):
"""Add a few events for testing."""
now = datetime.now()
five_days_ago = now - timedelta(days=5)
eleven_days_ago = now - timedelta(days=11)
event_data = {'test_attr': 5, 'test_attr_10': 'nice'}

self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done()

with recorder.session_scope(hass=self.hass) as session:
for event_id in range(5):
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
event_type = 'EVENT_TEST_AUTOPURGE'
elif event_id < 4:
timestamp = five_days_ago
event_type = 'EVENT_TEST_PURGE'
else:
Expand All @@ -102,8 +109,8 @@ def _add_test_events(self):
event_type='EVENT_TEST_FOR_PROTECTED',
event_data=json.dumps(event_data),
origin='LOCAL',
created=five_days_ago,
time_fired=five_days_ago,
created=eleven_days_ago,
time_fired=eleven_days_ago,
)
session.add(protected_event)
session.flush()
Expand All @@ -113,13 +120,13 @@ def _add_test_events(self):
def test_purge_old_states(self):
"""Test deleting old states."""
self._add_test_states()
# make sure we start with 6 states
# make sure we start with 7 states
with session_scope(hass=self.hass) as session:
states = session.query(States)
self.assertEqual(states.count(), 6)
self.assertEqual(states.count(), 7)

# run purge_old_data()
purge_old_data(self.hass.data[DATA_INSTANCE], 4)
purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)

# we should only have 3 states left after purging
self.assertEqual(states.count(), 3)
Expand All @@ -131,13 +138,13 @@ def test_purge_old_events(self):
with session_scope(hass=self.hass) as session:
events = session.query(Events).filter(
Events.event_type.like("EVENT_TEST%"))
self.assertEqual(events.count(), 6)
self.assertEqual(events.count(), 7)

# run purge_old_data()
purge_old_data(self.hass.data[DATA_INSTANCE], 4)
purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)

# now we should only have 3 events left
self.assertEqual(events.count(), 3)
# no state to protect, now we should only have 2 events left
self.assertEqual(events.count(), 2)

def test_purge_method(self):
"""Test purge method."""
Expand All @@ -148,24 +155,24 @@ def test_purge_method(self):
# make sure we start with 6 states
with session_scope(hass=self.hass) as session:
states = session.query(States)
self.assertEqual(states.count(), 6)
self.assertEqual(states.count(), 7)

events = session.query(Events).filter(
Events.event_type.like("EVENT_TEST%"))
self.assertEqual(events.count(), 6)
self.assertEqual(events.count(), 7)

self.hass.data[DATA_INSTANCE].block_till_done()

# run purge method - no service data, should not work
# run purge method - no service data, use defaults
self.hass.services.call('recorder', 'purge')
self.hass.async_block_till_done()

# Small wait for recorder thread
self.hass.data[DATA_INSTANCE].block_till_done()

# we should still have everything from before
self.assertEqual(states.count(), 6)
self.assertEqual(events.count(), 6)
# only purged old events
self.assertEqual(states.count(), 5)
self.assertEqual(events.count(), 5)

# run purge method - correct service data
self.hass.services.call('recorder', 'purge',
Expand All @@ -182,11 +189,20 @@ def test_purge_method(self):
self.assertTrue('iamprotected' in (
state.state for state in states))

# now we should only have 4 events left
self.assertEqual(events.count(), 4)
# now we should only have 3 events left
self.assertEqual(events.count(), 3)

# and the protected event is among them
self.assertTrue('EVENT_TEST_FOR_PROTECTED' in (
event.event_type for event in events.all()))
self.assertFalse('EVENT_TEST_PURGE' in (
event.event_type for event in events.all()))

# run purge method - correct service data, with repack
service_data['repack'] = True
self.assertFalse(self.hass.data[DATA_INSTANCE].did_vacuum)
self.hass.services.call('recorder', 'purge',
service_data=service_data)
self.hass.async_block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done()
self.assertTrue(self.hass.data[DATA_INSTANCE].did_vacuum)