Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion homeassistant/components/recorder/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ def _process_one_task_or_event_or_recover(self, task: RecorderTask | Event) -> N
# that is pending before running the task
if TYPE_CHECKING:
assert isinstance(task, RecorderTask)
if not task.commit_before:
if task.commit_before:
Comment thread
bdraco marked this conversation as resolved.
self._commit_event_session_or_retry()
return task.run(self)
except exc.DatabaseError as err:
Expand Down
7 changes: 5 additions & 2 deletions homeassistant/components/recorder/entity_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from homeassistant.helpers.start import async_at_start

from .core import Recorder
from .util import get_instance, session_scope
from .util import filter_unique_constraint_integrity_error, get_instance, session_scope

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,7 +61,10 @@ def update_states_metadata(
)
return

with session_scope(session=instance.get_session()) as session:
with session_scope(
session=instance.get_session(),
exception_filter=filter_unique_constraint_integrity_error(instance, "state"),
Comment thread
bdraco marked this conversation as resolved.
) as session:
if not states_meta_manager.update_metadata(session, entity_id, new_entity_id):
_LOGGER.warning(
"Cannot migrate history for entity_id `%s` to `%s` "
Expand Down
68 changes: 14 additions & 54 deletions homeassistant/components/recorder/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from collections import defaultdict
from collections.abc import Callable, Iterable, Sequence
import contextlib
import dataclasses
from datetime import datetime, timedelta
from functools import lru_cache, partial
Expand All @@ -15,7 +14,7 @@

from sqlalchemy import Select, and_, bindparam, func, lambda_stmt, select, text
from sqlalchemy.engine.row import Row
from sqlalchemy.exc import SQLAlchemyError, StatementError
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.lambdas import StatementLambdaElement
import voluptuous as vol
Expand Down Expand Up @@ -72,6 +71,7 @@
from .util import (
execute,
execute_stmt_lambda_element,
filter_unique_constraint_integrity_error,
get_instance,
retryable_database_job,
session_scope,
Expand Down Expand Up @@ -454,7 +454,9 @@ def compile_missing_statistics(instance: Recorder) -> bool:

with session_scope(
session=instance.get_session(),
exception_filter=_filter_unique_constraint_integrity_error(instance),
exception_filter=filter_unique_constraint_integrity_error(
instance, "statistic"
),
) as session:
# Find the newest statistics run, if any
if last_run := session.query(func.max(StatisticsRuns.start)).scalar():
Expand Down Expand Up @@ -486,7 +488,9 @@ def compile_statistics(instance: Recorder, start: datetime, fire_events: bool) -
# Return if we already have 5-minute statistics for the requested period
with session_scope(
session=instance.get_session(),
exception_filter=_filter_unique_constraint_integrity_error(instance),
exception_filter=filter_unique_constraint_integrity_error(
instance, "statistic"
),
) as session:
modified_statistic_ids = _compile_statistics(
instance, session, start, fire_events
Expand Down Expand Up @@ -737,7 +741,9 @@ def update_statistics_metadata(
if new_statistic_id is not UNDEFINED and new_statistic_id is not None:
with session_scope(
session=instance.get_session(),
exception_filter=_filter_unique_constraint_integrity_error(instance),
exception_filter=filter_unique_constraint_integrity_error(
instance, "statistic"
),
) as session:
statistics_meta_manager.update_statistic_id(
session, DOMAIN, statistic_id, new_statistic_id
Expand Down Expand Up @@ -2246,54 +2252,6 @@ def async_add_external_statistics(
_async_import_statistics(hass, metadata, statistics)


def _filter_unique_constraint_integrity_error(
instance: Recorder,
) -> Callable[[Exception], bool]:
def _filter_unique_constraint_integrity_error(err: Exception) -> bool:
"""Handle unique constraint integrity errors."""
if not isinstance(err, StatementError):
return False

assert instance.engine is not None
dialect_name = instance.engine.dialect.name

ignore = False
if (
dialect_name == SupportedDialect.SQLITE
and "UNIQUE constraint failed" in str(err)
):
ignore = True
if (
dialect_name == SupportedDialect.POSTGRESQL
and err.orig
and hasattr(err.orig, "pgcode")
and err.orig.pgcode == "23505"
):
ignore = True
if (
dialect_name == SupportedDialect.MYSQL
and err.orig
and hasattr(err.orig, "args")
):
with contextlib.suppress(TypeError):
if err.orig.args[0] == 1062:
ignore = True

if ignore:
_LOGGER.warning(
(
"Blocked attempt to insert duplicated statistic rows, please report"
" at %s"
),
"https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue+label%3A%22integration%3A+recorder%22",
exc_info=err,
)

return ignore

return _filter_unique_constraint_integrity_error


def _import_statistics_with_session(
instance: Recorder,
session: Session,
Expand Down Expand Up @@ -2397,7 +2355,9 @@ def import_statistics(

with session_scope(
session=instance.get_session(),
exception_filter=_filter_unique_constraint_integrity_error(instance),
exception_filter=filter_unique_constraint_integrity_error(
instance, "statistic"
),
) as session:
return _import_statistics_with_session(
instance, session, metadata, statistics, table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,18 @@ def update_statistic_id(
recorder thread.
"""
self._assert_in_recorder_thread()
if self.get(session, new_statistic_id):
Comment thread
bdraco marked this conversation as resolved.
_LOGGER.error(
"Cannot rename statistic_id `%s` to `%s` because the new statistic_id is already in use",
old_statistic_id,
new_statistic_id,
)
return
session.query(StatisticsMeta).filter(
(StatisticsMeta.statistic_id == old_statistic_id)
& (StatisticsMeta.source == source)
).update({StatisticsMeta.statistic_id: new_statistic_id})
self._clear_cache([old_statistic_id, new_statistic_id])
self._clear_cache([old_statistic_id])

def delete(self, session: Session, statistic_ids: list[str]) -> None:
"""Clear statistics for a list of statistic_ids.
Expand Down
54 changes: 53 additions & 1 deletion homeassistant/components/recorder/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

from collections.abc import Callable, Collection, Generator, Iterable, Sequence
import contextlib
from contextlib import contextmanager
from datetime import date, datetime, timedelta
import functools
Expand All @@ -21,7 +22,7 @@
from sqlalchemy import inspect, text
from sqlalchemy.engine import Result, Row
from sqlalchemy.engine.interfaces import DBAPIConnection
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.exc import OperationalError, SQLAlchemyError, StatementError
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.lambdas import StatementLambdaElement
Expand Down Expand Up @@ -906,3 +907,54 @@ def get_index_by_name(session: Session, table_name: str, index_name: str) -> str
),
None,
)


def filter_unique_constraint_integrity_error(
instance: Recorder, row_type: str
) -> Callable[[Exception], bool]:
"""Create a filter for unique constraint integrity errors."""

def _filter_unique_constraint_integrity_error(err: Exception) -> bool:
"""Handle unique constraint integrity errors."""
if not isinstance(err, StatementError):
return False

assert instance.engine is not None
dialect_name = instance.engine.dialect.name

ignore = False
if (
dialect_name == SupportedDialect.SQLITE
and "UNIQUE constraint failed" in str(err)
):
ignore = True
if (
dialect_name == SupportedDialect.POSTGRESQL
and err.orig
and hasattr(err.orig, "pgcode")
and err.orig.pgcode == "23505"
):
ignore = True
if (
dialect_name == SupportedDialect.MYSQL
and err.orig
and hasattr(err.orig, "args")
):
with contextlib.suppress(TypeError):
if err.orig.args[0] == 1062:
ignore = True

if ignore:
_LOGGER.warning(
(
"Blocked attempt to insert duplicated %s rows, please report"
" at %s"
),
row_type,
"https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue+label%3A%22integration%3A+recorder%22",
exc_info=err,
)

return ignore

return _filter_unique_constraint_integrity_error
99 changes: 99 additions & 0 deletions tests/components/recorder/test_entity_registry.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""The tests for sensor recorder platform."""
from collections.abc import Callable
from unittest.mock import patch

import pytest
from sqlalchemy import select
from sqlalchemy.orm import Session

from homeassistant.components import recorder
from homeassistant.components.recorder import history
from homeassistant.components.recorder.db_schema import StatesMeta
from homeassistant.components.recorder.util import session_scope
Expand Down Expand Up @@ -260,4 +262,101 @@ def rename_entry():
assert _count_entity_id_in_states_meta(hass, session, "sensor.test99") == 1
assert _count_entity_id_in_states_meta(hass, session, "sensor.test1") == 1

# We should hit the safeguard in the states_meta_manager
assert "the new entity_id is already in use" in caplog.text

# We should not hit the safeguard in the entity_registry
assert "Blocked attempt to insert duplicated state rows" not in caplog.text


def test_rename_entity_collision_without_states_meta_safeguard(
hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture
) -> None:
"""Test states meta is not migrated when there is a collision.

This test disables the safeguard in the states_meta_manager
and relies on the filter_unique_constraint_integrity_error safeguard.
"""
hass = hass_recorder()
setup_component(hass, "sensor", {})

entity_reg = mock_registry(hass)

@callback
def add_entry():
reg_entry = entity_reg.async_get_or_create(
"sensor",
"test",
"unique_0000",
suggested_object_id="test1",
)
assert reg_entry.entity_id == "sensor.test1"

hass.add_job(add_entry)
hass.block_till_done()

zero, four, states = record_states(hass)
hist = history.get_significant_states(
hass, zero, four, list(set(states) | {"sensor.test99", "sensor.test1"})
)
assert_dict_of_states_equal_without_context_and_last_changed(states, hist)
assert len(hist["sensor.test1"]) == 3

hass.states.set("sensor.test99", "collision")
hass.states.remove("sensor.test99")

hass.block_till_done()
wait_recording_done(hass)

# Verify history before collision
hist = history.get_significant_states(
hass, zero, four, list(set(states) | {"sensor.test99", "sensor.test1"})
)
assert len(hist["sensor.test1"]) == 3
assert len(hist["sensor.test99"]) == 2

instance = recorder.get_instance(hass)
# Patch out the safeguard in the states meta manager
# so that we hit the filter_unique_constraint_integrity_error safeguard in the entity_registry
with patch.object(instance.states_meta_manager, "get", return_value=None):
# Rename entity sensor.test1 to sensor.test99
@callback
def rename_entry():
entity_reg.async_update_entity(
"sensor.test1", new_entity_id="sensor.test99"
)

hass.add_job(rename_entry)
wait_recording_done(hass)

# History is not migrated on collision
hist = history.get_significant_states(
hass, zero, four, list(set(states) | {"sensor.test99", "sensor.test1"})
)
assert len(hist["sensor.test1"]) == 3
assert len(hist["sensor.test99"]) == 2

with session_scope(hass=hass) as session:
assert _count_entity_id_in_states_meta(hass, session, "sensor.test99") == 1

hass.states.set("sensor.test99", "post_migrate")
wait_recording_done(hass)

new_hist = history.get_significant_states(
hass,
zero,
dt_util.utcnow(),
list(set(states) | {"sensor.test99", "sensor.test1"}),
)
assert new_hist["sensor.test99"][-1].state == "post_migrate"
assert len(hist["sensor.test99"]) == 2

with session_scope(hass=hass) as session:
assert _count_entity_id_in_states_meta(hass, session, "sensor.test99") == 1
assert _count_entity_id_in_states_meta(hass, session, "sensor.test1") == 1

# We should not hit the safeguard in the states_meta_manager
assert "the new entity_id is already in use" not in caplog.text

# We should hit the safeguard in the entity_registry
assert "Blocked attempt to insert duplicated state rows" in caplog.text
Loading