Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions homeassistant/components/logbook/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,9 @@ def _legacy_select_events_context_id(
NOT_CONTEXT_ONLY,
)
.outerjoin(States, (Events.event_id == States.event_id))
.where(States.last_updated == States.last_changed)
.where(
(States.last_updated == States.last_changed) | States.last_changed.is_(None)
)
.where(_not_continuous_entity_matcher())
.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
Expand Down Expand Up @@ -302,7 +304,7 @@ def _select_states(start_day: dt, end_day: dt) -> Select:
"event_type"
),
literal(value=None, type_=sqlalchemy.Text).label("event_data"),
States.last_changed.label("time_fired"),
States.last_updated.label("time_fired"),
States.context_id.label("context_id"),
States.context_user_id.label("context_user_id"),
States.context_parent_id.label("context_parent_id"),
Expand All @@ -314,7 +316,9 @@ def _select_states(start_day: dt, end_day: dt) -> Select:
.outerjoin(old_state, (States.old_state_id == old_state.state_id))
.where(_missing_state_matcher(old_state))
.where(_not_continuous_entity_matcher())
.where(States.last_updated == States.last_changed)
.where(
(States.last_updated == States.last_changed) | States.last_changed.is_(None)
)
.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
Expand Down
78 changes: 50 additions & 28 deletions homeassistant/components/recorder/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@
States.last_changed,
States.last_updated,
]
BASE_STATES_NO_LAST_UPDATED = [
BASE_STATES_NO_LAST_CHANGED = [
States.entity_id,
States.state,
States.last_changed,
literal(value=None, type_=Text).label("last_updated"),
literal(value=None, type_=Text).label("last_changed"),
States.last_updated,
]
QUERY_STATE_NO_ATTR = [
*BASE_STATES,
literal(value=None, type_=Text).label("attributes"),
literal(value=None, type_=Text).label("shared_attrs"),
]
QUERY_STATE_NO_ATTR_NO_LAST_UPDATED = [
*BASE_STATES_NO_LAST_UPDATED,
QUERY_STATE_NO_ATTR_NO_LAST_CHANGED = [
*BASE_STATES_NO_LAST_CHANGED,
literal(value=None, type_=Text).label("attributes"),
literal(value=None, type_=Text).label("shared_attrs"),
]
Expand All @@ -92,8 +92,8 @@
States.attributes,
literal(value=None, type_=Text).label("shared_attrs"),
]
QUERY_STATES_PRE_SCHEMA_25_NO_LAST_UPDATED = [
*BASE_STATES_NO_LAST_UPDATED,
QUERY_STATES_PRE_SCHEMA_25_NO_LAST_CHANGED = [
*BASE_STATES_NO_LAST_CHANGED,
States.attributes,
literal(value=None, type_=Text).label("shared_attrs"),
]
Expand All @@ -103,8 +103,8 @@
States.attributes,
StateAttributes.shared_attrs,
]
QUERY_STATES_NO_LAST_UPDATED = [
*BASE_STATES_NO_LAST_UPDATED,
QUERY_STATES_NO_LAST_CHANGED = [
*BASE_STATES_NO_LAST_CHANGED,
# Remove States.attributes once all attributes are in StateAttributes.shared_attrs
States.attributes,
StateAttributes.shared_attrs,
Expand All @@ -114,7 +114,7 @@


def bake_query_and_join_attributes(
hass: HomeAssistant, no_attributes: bool, include_last_updated: bool = True
hass: HomeAssistant, no_attributes: bool, include_last_changed: bool = True
) -> tuple[Any, bool]:
"""Return the initial backed query and if StateAttributes should be joined.

Expand All @@ -126,31 +126,31 @@ def bake_query_and_join_attributes(
# without the attributes fields and do not join the
# state_attributes table
if no_attributes:
if include_last_updated:
if include_last_changed:
return bakery(lambda s: s.query(*QUERY_STATE_NO_ATTR)), False
return (
bakery(lambda s: s.query(*QUERY_STATE_NO_ATTR_NO_LAST_UPDATED)),
bakery(lambda s: s.query(*QUERY_STATE_NO_ATTR_NO_LAST_CHANGED)),
False,
)
# If we in the process of migrating schema we do
# not want to join the state_attributes table as we
# do not know if it will be there yet
if recorder.get_instance(hass).schema_version < 25:
if include_last_updated:
if include_last_changed:
return (
bakery(lambda s: s.query(*QUERY_STATES_PRE_SCHEMA_25)),
False,
)
return (
bakery(lambda s: s.query(*QUERY_STATES_PRE_SCHEMA_25_NO_LAST_UPDATED)),
bakery(lambda s: s.query(*QUERY_STATES_PRE_SCHEMA_25_NO_LAST_CHANGED)),
False,
)
# Finally if no migration is in progress and no_attributes
# was not requested, we query both attributes columns and
# join state_attributes
if include_last_updated:
if include_last_changed:
return bakery(lambda s: s.query(*QUERY_STATES)), True
return bakery(lambda s: s.query(*QUERY_STATES_NO_LAST_UPDATED)), True
return bakery(lambda s: s.query(*QUERY_STATES_NO_LAST_CHANGED)), True


def async_setup(hass: HomeAssistant) -> None:
Expand Down Expand Up @@ -213,18 +213,21 @@ def _query_significant_states_with_session(
if _LOGGER.isEnabledFor(logging.DEBUG):
timer_start = time.perf_counter()

baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes)
baked_query, join_attributes = bake_query_and_join_attributes(
hass, no_attributes, include_last_changed=True
)

if entity_ids is not None and len(entity_ids) == 1:
if (
significant_changes_only
and split_entity_id(entity_ids[0])[0] not in SIGNIFICANT_DOMAINS
):
baked_query, join_attributes = bake_query_and_join_attributes(
hass, no_attributes, include_last_updated=False
hass, no_attributes, include_last_changed=False
)
baked_query += lambda q: q.filter(
States.last_changed == States.last_updated
(States.last_changed == States.last_updated)
| States.last_changed.is_(None)
)
elif significant_changes_only:
baked_query += lambda q: q.filter(
Expand All @@ -233,7 +236,10 @@ def _query_significant_states_with_session(
States.entity_id.like(entity_domain)
for entity_domain in SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE
],
(States.last_changed == States.last_updated),
(
(States.last_changed == States.last_updated)
| States.last_changed.is_(None)
),
)
)

Expand Down Expand Up @@ -360,11 +366,14 @@ def state_changes_during_period(
"""Return states changes during UTC period start_time - end_time."""
with session_scope(hass=hass) as session:
baked_query, join_attributes = bake_query_and_join_attributes(
hass, no_attributes, include_last_updated=False
hass, no_attributes, include_last_changed=False
)

baked_query += lambda q: q.filter(
(States.last_changed == States.last_updated)
(
(States.last_changed == States.last_updated)
| States.last_changed.is_(None)
)
& (States.last_updated > bindparam("start_time"))
)

Expand Down Expand Up @@ -424,10 +433,12 @@ def get_last_state_changes(

with session_scope(hass=hass) as session:
baked_query, join_attributes = bake_query_and_join_attributes(
hass, False, include_last_updated=False
hass, False, include_last_changed=False
)

baked_query += lambda q: q.filter(States.last_changed == States.last_updated)
baked_query += lambda q: q.filter(
(States.last_changed == States.last_updated) | States.last_changed.is_(None)
)

if entity_id is not None:
baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id"))
Expand Down Expand Up @@ -489,7 +500,9 @@ def _get_states_baked_query_for_entites(
no_attributes: bool = False,
) -> BakedQuery:
"""Baked query to get states for specific entities."""
baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes)
baked_query, join_attributes = bake_query_and_join_attributes(
hass, no_attributes, include_last_changed=True
)
baked_query += _most_recent_state_ids_entities_subquery
if join_attributes:
baked_query += lambda q: q.outerjoin(
Expand Down Expand Up @@ -540,7 +553,9 @@ def _get_states_baked_query_for_all(
no_attributes: bool = False,
) -> BakedQuery:
"""Baked query to get states for all entities."""
baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes)
baked_query, join_attributes = bake_query_and_join_attributes(
hass, no_attributes, include_last_changed=True
)
baked_query += _most_recent_state_ids_subquery
baked_query += _ignore_domains_filter
if filters:
Expand Down Expand Up @@ -599,7 +614,9 @@ def _get_single_entity_states_with_session(
) -> list[Row]:
# Use an entirely different (and extremely fast) query if we only
# have a single entity id
baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes)
baked_query, join_attributes = bake_query_and_join_attributes(
hass, no_attributes, include_last_changed=True
)
baked_query += lambda q: q.filter(
States.last_updated < bindparam("utc_point_in_time"),
States.entity_id == bindparam("entity_id"),
Expand Down Expand Up @@ -720,7 +737,12 @@ def _sorted_states_to_dict(
ent_results.append(
{
attr_state: state,
attr_last_changed: _process_timestamp(row.last_changed),
#
# minimal_response only makes sense with last_updated == last_updated
#
# We use last_updated for for last_changed since its the same
#
attr_last_changed: _process_timestamp(row.last_updated),
}
)
prev_state = state
Expand Down
74 changes: 42 additions & 32 deletions homeassistant/components/recorder/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class States(Base): # type: ignore[misc,valid-type]
event_id = Column(
Integer, ForeignKey("events.event_id", ondelete="CASCADE"), index=True
)
last_changed = Column(DATETIME_TYPE, default=dt_util.utcnow)
last_changed = Column(DATETIME_TYPE)
last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True)
old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True)
attributes_id = Column(
Expand Down Expand Up @@ -291,12 +291,16 @@ def from_event(event: Event) -> States:
# None state means the state was removed from the state machine
if state is None:
dbstate.state = ""
dbstate.last_changed = event.time_fired
dbstate.last_updated = event.time_fired
dbstate.last_changed = None
return dbstate

dbstate.state = state.state
dbstate.last_updated = state.last_updated
if state.last_updated == state.last_changed:
dbstate.last_changed = None
else:
dbstate.state = state.state
dbstate.last_changed = state.last_changed
dbstate.last_updated = state.last_updated

return dbstate

Expand All @@ -308,21 +312,27 @@ def to_native(self, validate_entity_id: bool = True) -> State | None:
parent_id=self.context_parent_id,
)
try:
return State(
self.entity_id,
self.state,
# Join the state_attributes table on attributes_id to get the attributes
# for newer states
json.loads(self.attributes) if self.attributes else {},
process_timestamp(self.last_changed),
process_timestamp(self.last_updated),
context=context,
validate_entity_id=validate_entity_id,
)
attrs = json.loads(self.attributes) if self.attributes else {}
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting row to state: %s", self)
return None
if self.last_changed is None or self.last_changed == self.last_updated:
last_changed = last_updated = process_timestamp(self.last_updated)
else:
last_updated = process_timestamp(self.last_updated)
last_changed = process_timestamp(self.last_changed)
return State(
self.entity_id,
self.state,
# Join the state_attributes table on attributes_id to get the attributes
# for newer states
attrs,
last_changed,
last_updated,
context=context,
validate_entity_id=validate_entity_id,
)


class StateAttributes(Base): # type: ignore[misc,valid-type]
Expand Down Expand Up @@ -708,7 +718,10 @@ def context(self, value: Context) -> None:
def last_changed(self) -> datetime: # type: ignore[override]
"""Last changed datetime."""
if self._last_changed is None:
self._last_changed = process_timestamp(self._row.last_changed)
if (last_changed := self._row.last_changed) is not None:
self._last_changed = process_timestamp(last_changed)
else:
self._last_changed = self.last_updated
return self._last_changed

@last_changed.setter
Expand All @@ -720,10 +733,7 @@ def last_changed(self, value: datetime) -> None:
def last_updated(self) -> datetime: # type: ignore[override]
"""Last updated datetime."""
if self._last_updated is None:
if (last_updated := self._row.last_updated) is not None:
self._last_updated = process_timestamp(last_updated)
else:
self._last_updated = self.last_changed
self._last_updated = process_timestamp(self._row.last_updated)
return self._last_updated

@last_updated.setter
Expand All @@ -739,24 +749,24 @@ def as_dict(self) -> dict[str, Any]: # type: ignore[override]
To be used for JSON serialization.
"""
if self._last_changed is None and self._last_updated is None:
last_changed_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_changed
last_updated_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_updated
)
if (
self._row.last_updated is None
self._row.last_changed is None
or self._row.last_changed == self._row.last_updated
):
last_updated_isoformat = last_changed_isoformat
last_changed_isoformat = last_updated_isoformat
else:
last_updated_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_updated
last_changed_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_changed
)
else:
last_changed_isoformat = self.last_changed.isoformat()
last_updated_isoformat = self.last_updated.isoformat()
if self.last_changed == self.last_updated:
last_updated_isoformat = last_changed_isoformat
last_changed_isoformat = last_updated_isoformat
else:
last_updated_isoformat = self.last_updated.isoformat()
last_changed_isoformat = self.last_changed.isoformat()
return {
"entity_id": self.entity_id,
"state": self.state,
Expand Down Expand Up @@ -801,13 +811,13 @@ def row_to_compressed_state(
if start_time:
last_changed = last_updated = start_time.timestamp()
else:
row_changed_changed: datetime = row.last_changed
row_last_updated: datetime = row.last_updated
if (
not (row_last_updated := row.last_updated)
not (row_changed_changed := row.last_changed)
or row_last_updated == row_changed_changed
):
last_changed = last_updated = process_datetime_to_timestamp(
row_changed_changed
row_last_updated
)
else:
last_changed = process_datetime_to_timestamp(row_changed_changed)
Expand Down
Loading