Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e1208e9
Migrate statistics to use timestamp columns
bdraco Feb 3, 2023
0b4b5b7
update queries
bdraco Feb 3, 2023
ee065e9
leave dupliate stats cleanup alone
bdraco Feb 3, 2023
4c5c6a3
leave dupliate stats cleanup alone
bdraco Feb 3, 2023
03218f8
lint
bdraco Feb 3, 2023
3f46b39
lint
bdraco Feb 3, 2023
d6f83a6
fixes
bdraco Feb 3, 2023
009c1b7
bump it
bdraco Feb 3, 2023
e32772a
fixes
bdraco Feb 3, 2023
59395e6
fixes
bdraco Feb 3, 2023
60d4ba6
fixes
bdraco Feb 3, 2023
6fd6055
Merge branch 'dev' into stats_migration
bdraco Feb 3, 2023
39a6c18
tweak language
bdraco Feb 3, 2023
5f7c96a
tweak language
bdraco Feb 3, 2023
aa6460a
fix version
bdraco Feb 3, 2023
1257a73
make sure to cleanup old indexes when we are done as well
bdraco Feb 3, 2023
05aef96
make sure to cleanup old indexes when we are done as well
bdraco Feb 3, 2023
9ad6e8b
tweak
bdraco Feb 3, 2023
bcf5dd7
drop index after cleanup
bdraco Feb 3, 2023
d8d82d3
Merge branch 'dev' into stats_migration
bdraco Feb 5, 2023
77b63e7
tweak
bdraco Feb 5, 2023
21a625c
tweak
bdraco Feb 5, 2023
dac7e4a
missing fstring
bdraco Feb 5, 2023
c0398e7
Merge branch 'dev' into stats_migration
bdraco Feb 5, 2023
cb28009
relo code to match design
bdraco Feb 5, 2023
b83babb
Merge branch 'dev' into stats_migration
bdraco Feb 6, 2023
11096ff
set the time zone to utc for the connection with mysql so we avoid th…
bdraco Feb 6, 2023
4757bca
update test
bdraco Feb 6, 2023
198068f
Merge branch 'dev' into stats_migration
bdraco Feb 7, 2023
51e4914
Merge branch 'dev' into stats_migration
bdraco Feb 7, 2023
22ff378
Merge branch 'dev' into stats_migration
bdraco Feb 7, 2023
349842e
Merge remote-tracking branch 'upstream/dev' into stats_migration
bdraco Feb 8, 2023
27dc5cb
move asserts
bdraco Feb 8, 2023
c661870
simplify math
bdraco Feb 8, 2023
0f2ae5f
Merge branch 'dev' into stats_migration
bdraco Feb 8, 2023
7212fdc
tweak
bdraco Feb 9, 2023
8d856c8
Apply suggestions from code review
bdraco Feb 9, 2023
a81167f
Update homeassistant/components/recorder/migration.py
bdraco Feb 9, 2023
1a819ef
Merge branch 'dev' into stats_migration
bdraco Feb 9, 2023
8b95851
Merge branch 'dev' into stats_migration
bdraco Feb 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions homeassistant/components/recorder/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,10 +638,8 @@ def run(self) -> None:
else:
persistent_notification.create(
self.hass,
(
"The database migration failed, check [the logs](/config/logs)."
"Database Migration Failed"
),
"The database migration failed, check [the logs](/config/logs).",
"Database Migration Failed",
"recorder_database_migration",
)
self.hass.add_job(self.async_set_db_ready)
Expand Down Expand Up @@ -730,8 +728,10 @@ def _migrate_schema_and_setup_run(
(
"System performance will temporarily degrade during the database"
" upgrade. Do not power down or restart the system until the upgrade"
" completes. Integrations that read the database, such as logbook and"
" history, may return inconsistent results until the upgrade completes."
" completes. Integrations that read the database, such as logbook,"
" history, and statistics may return inconsistent results until the "
" upgrade completes. This notification will be automatically dismissed"
" when the upgrade completes."
),
"Database upgrade in progress",
"recorder_database_migration",
Expand Down Expand Up @@ -1027,11 +1027,7 @@ def _open_event_session(self) -> None:

def _post_schema_migration(self, old_version: int, new_version: int) -> None:
"""Run post schema migration tasks."""
assert self.engine is not None
assert self.event_session is not None
migration.post_schema_migration(
self.engine, self.event_session, old_version, new_version
)
migration.post_schema_migration(self, old_version, new_version)

def _send_keep_alive(self) -> None:
"""Send a keep alive to keep the db connection open."""
Expand Down
45 changes: 36 additions & 9 deletions homeassistant/components/recorder/db_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@
import homeassistant.util.dt as dt_util

from .const import ALL_DOMAIN_EXCLUDE_ATTRS, SupportedDialect
from .models import StatisticData, StatisticMetaData, process_timestamp
from .models import (
StatisticData,
StatisticMetaData,
datetime_to_timestamp_or_none,
process_timestamp,
)


# SQLAlchemy Schema
Expand All @@ -61,7 +66,7 @@ class Base(DeclarativeBase):
"""Base class for tables."""


SCHEMA_VERSION = 33
SCHEMA_VERSION = 35

_LOGGER = logging.getLogger(__name__)

Expand All @@ -76,6 +81,8 @@ class Base(DeclarativeBase):
TABLE_STATISTICS_RUNS = "statistics_runs"
TABLE_STATISTICS_SHORT_TERM = "statistics_short_term"

STATISTICS_TABLES = ("statistics", "statistics_short_term")

MAX_STATE_ATTRS_BYTES = 16384
PSQL_DIALECT = SupportedDialect.POSTGRESQL

Expand Down Expand Up @@ -502,17 +509,24 @@ class StatisticsBase:
"""Statistics base class."""

id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True)
created: Mapped[datetime] = mapped_column(DATETIME_TYPE, default=dt_util.utcnow)
created: Mapped[datetime] = mapped_column(
DATETIME_TYPE, default=dt_util.utcnow
) # No longer used
created_ts: Mapped[float] = mapped_column(TIMESTAMP_TYPE, default=time.time)
metadata_id: Mapped[int | None] = mapped_column(
Integer,
ForeignKey(f"{TABLE_STATISTICS_META}.id", ondelete="CASCADE"),
index=True,
)
start: Mapped[datetime | None] = mapped_column(DATETIME_TYPE, index=True)
start: Mapped[datetime | None] = mapped_column(
DATETIME_TYPE, index=True
) # No longer used
start_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE, index=True)
mean: Mapped[float | None] = mapped_column(DOUBLE_TYPE)
min: Mapped[float | None] = mapped_column(DOUBLE_TYPE)
max: Mapped[float | None] = mapped_column(DOUBLE_TYPE)
last_reset: Mapped[datetime | None] = mapped_column(DATETIME_TYPE)
last_reset_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE)
state: Mapped[float | None] = mapped_column(DOUBLE_TYPE)
sum: Mapped[float | None] = mapped_column(DOUBLE_TYPE)

Expand All @@ -521,9 +535,17 @@ class StatisticsBase:
@classmethod
def from_stats(cls, metadata_id: int, stats: StatisticData) -> Self:
"""Create object from a statistics."""
return cls( # type: ignore[call-arg,misc]
return cls( # type: ignore[call-arg]
metadata_id=metadata_id,
**stats,
start=None,
start_ts=dt_util.utc_to_timestamp(stats["start"]),
mean=stats.get("mean"),
min=stats.get("min"),
max=stats.get("max"),
last_reset=None,
last_reset_ts=datetime_to_timestamp_or_none(stats.get("last_reset")),
state=stats.get("state"),
sum=stats.get("sum"),
)


Expand All @@ -534,7 +556,12 @@ class Statistics(Base, StatisticsBase):

__table_args__ = (
# Used for fetching statistics for a certain entity at a specific time
Index("ix_statistics_statistic_id_start", "metadata_id", "start", unique=True),
Index(
"ix_statistics_statistic_id_start_ts",
"metadata_id",
"start_ts",
unique=True,
),
)
__tablename__ = TABLE_STATISTICS

Expand All @@ -547,9 +574,9 @@ class StatisticsShortTerm(Base, StatisticsBase):
__table_args__ = (
# Used for fetching statistics for a certain entity at a specific time
Index(
"ix_statistics_short_term_statistic_id_start",
"ix_statistics_short_term_statistic_id_start_ts",
"metadata_id",
"start",
"start_ts",
unique=True,
),
)
Expand Down
185 changes: 151 additions & 34 deletions homeassistant/components/recorder/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .const import SupportedDialect
from .db_schema import (
SCHEMA_VERSION,
STATISTICS_TABLES,
TABLE_STATES,
Base,
SchemaChanges,
Expand All @@ -43,14 +44,19 @@
get_start_time,
validate_db_schema as statistics_validate_db_schema,
)
from .tasks import CommitTask, PostSchemaMigrationTask
from .tasks import (
CommitTask,
PostSchemaMigrationTask,
StatisticsTimestampMigrationCleanupTask,
)
from .util import session_scope

if TYPE_CHECKING:
from . import Recorder

LIVE_MIGRATION_MIN_SCHEMA_VERSION = 0


_LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -274,13 +280,21 @@ def _drop_index(
"Finished dropping index %s from table %s", index_name, table_name
)
else:
if index_name in ("ix_states_entity_id", "ix_states_context_parent_id"):
if index_name in (
"ix_states_entity_id",
"ix_states_context_parent_id",
"ix_statistics_short_term_statistic_id_start",
"ix_statistics_statistic_id_start",
):
# ix_states_context_parent_id was only there on nightly so we do not want
# to generate log noise or issues about it.
#
# ix_states_entity_id was only there for users who upgraded from schema
# version 8 or earlier. Newer installs will not have it so we do not
# want to generate log noise or issues about it.
#
# ix_statistics_short_term_statistic_id_start and ix_statistics_statistic_id_start
# were only there for users who upgraded from schema version 23 or earlier.
return

_LOGGER.warning(
Expand Down Expand Up @@ -764,35 +778,9 @@ def _apply_update( # noqa: C901
# Add name column to StatisticsMeta
_add_columns(session_maker, "statistics_meta", ["name VARCHAR(255)"])
elif new_version == 24:
# Recreate statistics indices to block duplicated statistics
_drop_index(session_maker, "statistics", "ix_statistics_statistic_id_start")
_drop_index(
session_maker,
"statistics_short_term",
"ix_statistics_short_term_statistic_id_start",
)
try:
_create_index(
session_maker, "statistics", "ix_statistics_statistic_id_start"
)
_create_index(
session_maker,
"statistics_short_term",
"ix_statistics_short_term_statistic_id_start",
)
except DatabaseError:
# There may be duplicated statistics entries, delete duplicated statistics
# and try again
with session_scope(session=session_maker()) as session:
delete_statistics_duplicates(hass, session)
_create_index(
session_maker, "statistics", "ix_statistics_statistic_id_start"
)
_create_index(
session_maker,
"statistics_short_term",
"ix_statistics_short_term_statistic_id_start",
)
_LOGGER.debug("Deleting duplicated statistics entries")
with session_scope(session=session_maker()) as session:
delete_statistics_duplicates(hass, session)
elif new_version == 25:
_add_columns(session_maker, "states", [f"attributes_id {big_int}"])
_create_index(session_maker, "states", "ix_states_attributes_id")
Expand Down Expand Up @@ -881,13 +869,62 @@ def _apply_update( # noqa: C901
# when querying the states table.
# https://github.com/home-assistant/core/issues/83787
_drop_index(session_maker, "states", "ix_states_entity_id")
elif new_version == 34:
# Once we require SQLite >= 3.35.5, we should drop the columns:
# ALTER TABLE statistics DROP COLUMN created
# ALTER TABLE statistics DROP COLUMN start
# ALTER TABLE statistics DROP COLUMN last_reset
# ALTER TABLE statistics_short_term DROP COLUMN created
# ALTER TABLE statistics_short_term DROP COLUMN start
# ALTER TABLE statistics_short_term DROP COLUMN last_reset
_add_columns(
session_maker,
"statistics",
[
f"created_ts {timestamp_type}",
f"start_ts {timestamp_type}",
f"last_reset_ts {timestamp_type}",
],
)
_add_columns(
session_maker,
"statistics_short_term",
[
f"created_ts {timestamp_type}",
f"start_ts {timestamp_type}",
f"last_reset_ts {timestamp_type}",
],
)
_create_index(session_maker, "statistics", "ix_statistics_start_ts")
_create_index(
session_maker, "statistics", "ix_statistics_statistic_id_start_ts"
)
_create_index(
session_maker, "statistics_short_term", "ix_statistics_short_term_start_ts"
)
_create_index(
session_maker,
"statistics_short_term",
"ix_statistics_short_term_statistic_id_start_ts",
)
_migrate_statistics_columns_to_timestamp(session_maker, engine)
elif new_version == 35:
# Migration is done in two steps to ensure we can start using
# the new columns before we wipe the old ones.
_drop_index(session_maker, "statistics", "ix_statistics_statistic_id_start")
_drop_index(
session_maker,
"statistics_short_term",
"ix_statistics_short_term_statistic_id_start",
)
# ix_statistics_start and ix_statistics_statistic_id_start are still used
# for the post migration cleanup and can be removed in a future version.
else:
raise ValueError(f"No schema migration defined for version {new_version}")


def post_schema_migration(
engine: Engine,
session: Session,
instance: Recorder,
old_version: int,
new_version: int,
) -> None:
Expand All @@ -905,7 +942,19 @@ def post_schema_migration(
# In version 31 we migrated all the time_fired, last_updated, and last_changed
# columns to be timestamps. In version 32 we need to wipe the old columns
# since they are no longer used and take up a significant amount of space.
_wipe_old_string_time_columns(engine, session)
assert instance.event_session is not None
assert instance.engine is not None
_wipe_old_string_time_columns(instance.engine, instance.event_session)
if old_version < 35 <= new_version:
# In version 34 we migrated all the created, start, and last_reset
# columns to be timestamps. In version 34 we need to wipe the old columns
# since they are no longer used and take up a significant amount of space.
_wipe_old_string_statistics_columns(instance)


def _wipe_old_string_statistics_columns(instance: Recorder) -> None:
"""Wipe old string statistics columns to save space."""
instance.queue_task(StatisticsTimestampMigrationCleanupTask())


def _wipe_old_string_time_columns(engine: Engine, session: Session) -> None:
Expand Down Expand Up @@ -1048,6 +1097,74 @@ def _migrate_columns_to_timestamp(
)


def _migrate_statistics_columns_to_timestamp(
session_maker: Callable[[], Session], engine: Engine
) -> None:
"""Migrate statistics columns to use timestamp."""
# Migrate all data in statistics.start to statistics.start_ts
# Migrate all data in statistics.created to statistics.created_ts
# Migrate all data in statistics.last_reset to statistics.last_reset_ts
# Migrate all data in statistics_short_term.start to statistics_short_term.start_ts
# Migrate all data in statistics_short_term.created to statistics_short_term.created_ts
# Migrate all data in statistics_short_term.last_reset to statistics_short_term.last_reset_ts
result: CursorResult | None = None
if engine.dialect.name == SupportedDialect.SQLITE:
# With SQLite we do this in one go since it is faster
for table in STATISTICS_TABLES:
with session_scope(session=session_maker()) as session:
session.connection().execute(
text(
f"UPDATE {table} set start_ts=strftime('%s',start) + "
"cast(substr(start,-7) AS FLOAT), "
f"created_ts=strftime('%s',created) + "
"cast(substr(created,-7) AS FLOAT), "
f"last_reset_ts=strftime('%s',last_reset) + "
"cast(substr(last_reset,-7) AS FLOAT);"
)
)
elif engine.dialect.name == SupportedDialect.MYSQL:
# With MySQL we do this in chunks to avoid hitting the `innodb_buffer_pool_size` limit
# We also need to do this in a loop since we can't be sure that we have
# updated all rows in the table until the rowcount is 0
for table in STATISTICS_TABLES:
result = None
while result is None or result.rowcount > 0: # type: ignore[unreachable]
with session_scope(session=session_maker()) as session:
result = session.connection().execute(
text(
f"UPDATE {table} set start_ts="
"IF(start is NULL,0,"
"UNIX_TIMESTAMP(start) "
"), "
"created_ts="
"UNIX_TIMESTAMP(created), "
"last_reset_ts="
"UNIX_TIMESTAMP(last_reset) "
"where start_ts is NULL "
"LIMIT 250000;"
)
)
elif engine.dialect.name == SupportedDialect.POSTGRESQL:
# With Postgresql we do this in chunks to avoid using too much memory
# We also need to do this in a loop since we can't be sure that we have
# updated all rows in the table until the rowcount is 0
for table in STATISTICS_TABLES:
result = None
while result is None or result.rowcount > 0: # type: ignore[unreachable]
with session_scope(session=session_maker()) as session:
result = session.connection().execute(
text(
f"UPDATE {table} set start_ts=" # nosec
"(case when start is NULL then 0 else EXTRACT(EPOCH FROM start) end), "
"created_ts=EXTRACT(EPOCH FROM created), "
"last_reset_ts=EXTRACT(EPOCH FROM last_reset) "
"where id IN ( "
f"SELECT id FROM {table} where start_ts is NULL LIMIT 250000 "
" );"
)
)


def _initialize_database(session: Session) -> bool:
"""Initialize a new database.

Expand Down
Loading