Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 188 additions & 0 deletions custom_components/givenergy_local/binary_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
"""Binary sensor platform — a single plant-level battery out-of-spec alert (#78)."""

from __future__ import annotations

from collections.abc import Iterator
from dataclasses import dataclass, field
from datetime import datetime

from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import DeviceInfo, EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity

from .const import DOMAIN
from .coordinator import GivEnergyUpdateCoordinator

# LFP soft operating band. A cell that has genuinely drifted outside this for a
# sustained period warrants attention; the band is deliberately wider than the
# nominal working range so normal operation never crosses it.
CELL_MIN_V = 3.0
CELL_MAX_V = 3.5
# Unused cell slots in smaller packs read ~0 V. Anything below this floor is an
# unpopulated slot or a dropped read, not a real over-discharged cell, so it is
# excluded from the low-voltage check (the debounce handles dropped reads anyway).
CELL_PRESENT_FLOOR_V = 1.0

# Default cell-group temperature alert band (°C). Conservative LFP envelope; a
# tunable knob can follow if real-world data shows it needs widening.
TEMP_MIN_C = 0.0
TEMP_MAX_C = 50.0

# Hybrid debounce: a value must be out of spec for at least this long in
# wall-clock AND across at least this many distinct polls before the alert
# trips. Both bounds comfortably exceed the ~2-minute persistence that dongle
# bad-read garbage can exhibit (modbus#78), so a sustained fake read can't trip
# it, while the dual form survives both fast and slow pollers.
DEBOUNCE_SECONDS = 300
DEBOUNCE_MIN_POLLS = 3


@dataclass
class _Offender:
"""Tracks one metric that is currently out of spec across consecutive polls."""

battery: str
metric: str
value: float
first_seen: datetime
poll_count: int = 1


@dataclass
class _Reading:
battery: str
metric: str
value: float
detail: dict[str, object] = field(default_factory=dict)


async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
coordinator: GivEnergyUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
# Pointless on a battery-less (PV-only) install.
if not coordinator.data.batteries:
return
async_add_entities([GivEnergyBatteryOutOfSpecBinarySensor(coordinator)])


def _iter_readings(batteries: list) -> Iterator[_Reading]:
"""Yield every in-spec-checkable cell-voltage and group-temperature reading."""
for index, battery in enumerate(batteries):
serial = battery.serial_number or f"battery_{index}"
for cell in range(1, 17):
value = getattr(battery, f"v_cell_{cell:02d}", None)
if value is None or value < CELL_PRESENT_FLOOR_V:
continue # unpopulated slot or dropped read
yield _Reading(serial, f"cell_{cell:02d}_voltage", float(value), {"cell": cell})
for lo, hi in ((1, 4), (5, 8), (9, 12), (13, 16)):
value = getattr(battery, f"t_cells_{lo:02d}_{hi:02d}", None)
if value is None:
continue
yield _Reading(
serial,
f"cells_{lo:02d}_{hi:02d}_temperature",
float(value),
{"cell_group": f"{lo}-{hi}"},
)


def _out_of_spec(reading: _Reading) -> bool:
if reading.metric.endswith("_voltage"):
return not (CELL_MIN_V <= reading.value <= CELL_MAX_V)
return not (TEMP_MIN_C <= reading.value <= TEMP_MAX_C)


class GivEnergyBatteryOutOfSpecBinarySensor(
CoordinatorEntity[GivEnergyUpdateCoordinator], BinarySensorEntity
):
"""One plant-level alert: on when any monitored battery value has been out of
spec for a sustained period, debounced against transient bad reads."""

_attr_has_entity_name = True
_attr_name = "Battery Out Of Spec"
_attr_device_class = BinarySensorDeviceClass.PROBLEM
_attr_entity_category = EntityCategory.DIAGNOSTIC

def __init__(self, coordinator: GivEnergyUpdateCoordinator) -> None:
super().__init__(coordinator)
serial = coordinator.data.inverter_serial_number
self._attr_unique_id = f"{serial}_battery_out_of_spec"
self._attr_device_info = DeviceInfo(identifiers={(DOMAIN, serial)})
self._offenders: dict[str, _Offender] = {}
self._last_processed_refresh: datetime | None = None
self._evaluate()

def _handle_coordinator_update(self) -> None:
self._evaluate()
super()._handle_coordinator_update()

def _evaluate(self) -> None:
"""Advance the per-metric debounce trackers off the latest poll.

Runs once per distinct successful refresh so each poll counts exactly
once towards the sustained-poll requirement.
"""
refresh = self.coordinator.last_successful_refresh
if refresh is None or refresh == self._last_processed_refresh:
return
self._last_processed_refresh = refresh

current: dict[str, _Reading] = {
f"{r.battery}:{r.metric}": r
for r in _iter_readings(self.coordinator.data.batteries)
if _out_of_spec(r)
}
# Drop anything that has returned to spec.
for key in list(self._offenders):
if key not in current:
del self._offenders[key]
# Record/advance current offenders.
for key, reading in current.items():
existing = self._offenders.get(key)
if existing is None:
self._offenders[key] = _Offender(
battery=reading.battery,
metric=reading.metric,
value=reading.value,
first_seen=refresh,
)
else:
existing.value = reading.value
existing.poll_count += 1
Comment on lines +139 to +160

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Issue: Transient dropped reads (None) reset the debounce state machine

In the current implementation, any dropped read (None) or unpopulated slot (which are skipped in _iter_readings) will not be present in current. As a result, key not in current evaluates to True, and the offender is immediately deleted from self._offenders.

This means a single transient dropped read (which is common with flaky Modbus dongles) will completely reset the debounce counter for an active excursion, potentially preventing the alert from ever tripping or causing it to flap.

Solution

Instead of clearing offenders that are missing from current, we should only clear them if they are successfully read and confirmed to be back in spec. If a reading is missing (e.g., due to a dropped read), we should preserve the offender's state without incrementing its poll count.

        readings = {
            f"{r.battery}:{r.metric}": r
            for r in _iter_readings(self.coordinator.data.batteries)
        }
        # Drop anything that has returned to spec.
        for key in list(self._offenders):
            if key in readings and not _out_of_spec(readings[key]):
                del self._offenders[key]
        # Record/advance current offenders.
        for key, reading in readings.items():
            if _out_of_spec(reading):
                existing = self._offenders.get(key)
                if existing is None:
                    self._offenders[key] = _Offender(
                        battery=reading.battery,
                        metric=reading.metric,
                        value=reading.value,
                        first_seen=refresh,
                    )
                else:
                    existing.value = reading.value
                    existing.poll_count += 1


@property
def is_on(self) -> bool:
refresh = self.coordinator.last_successful_refresh
if refresh is None:
return False
return any(
offender.poll_count >= DEBOUNCE_MIN_POLLS
and (refresh - offender.first_seen).total_seconds() >= DEBOUNCE_SECONDS
for offender in self._offenders.values()
)

@property
def extra_state_attributes(self) -> dict[str, object]:
refresh = self.coordinator.last_successful_refresh
offenders = [
{
"battery": o.battery,
"metric": o.metric,
"value": o.value,
"polls_out_of_spec": o.poll_count,
"seconds_out_of_spec": (
int((refresh - o.first_seen).total_seconds()) if refresh else 0
),
}
for o in self._offenders.values()
]
return {"offenders": offenders}
2 changes: 1 addition & 1 deletion custom_components/givenergy_local/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

DEFAULT_PASSIVE = False

PLATFORMS = ["sensor", "switch", "number", "select", "time"]
PLATFORMS = ["binary_sensor", "sensor", "switch", "number", "select", "time"]

SERVICE_REBOOT_INVERTER = "reboot_inverter"
SERVICE_CALIBRATE_BATTERY_SOC = "calibrate_battery_soc"
Expand Down
161 changes: 161 additions & 0 deletions tests/test_binary_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
"""Tests for the battery out-of-spec binary sensor (issue #78)."""

from datetime import UTC, datetime, timedelta
from types import SimpleNamespace
from unittest.mock import MagicMock

from homeassistant.helpers import entity_registry as er

from custom_components.givenergy_local.binary_sensor import (
DEBOUNCE_MIN_POLLS,
DEBOUNCE_SECONDS,
GivEnergyBatteryOutOfSpecBinarySensor,
)
from custom_components.givenergy_local.const import DOMAIN

BASE = datetime(2026, 1, 1, tzinfo=UTC)


def _battery(serial="BT1", cells=None, temps=None):
cells = cells if cells is not None else [3.30] * 16
temps = temps if temps is not None else [22.0] * 4
bat = SimpleNamespace(serial_number=serial)
for i, volt in enumerate(cells, start=1):
setattr(bat, f"v_cell_{i:02d}", volt)
for (lo, hi), temp in zip(((1, 4), (5, 8), (9, 12), (13, 16)), temps):
setattr(bat, f"t_cells_{lo:02d}_{hi:02d}", temp)
return bat


def _entity(batteries):
coordinator = MagicMock()
coordinator.data.batteries = batteries
coordinator.data.inverter_serial_number = "SA1234G123"
coordinator.last_successful_refresh = None
return GivEnergyBatteryOutOfSpecBinarySensor(coordinator), coordinator


def _poll(entity, coordinator, at: datetime, batteries=None) -> None:
if batteries is not None:
coordinator.data.batteries = batteries
coordinator.last_successful_refresh = at
entity._evaluate()


# ---------------------------------------------------------------------------
# Debounce state machine (unit-level)
# ---------------------------------------------------------------------------


def test_in_spec_never_trips():
entity, coordinator = _entity([_battery()])
for n in range(6):
_poll(entity, coordinator, BASE + timedelta(seconds=n * DEBOUNCE_SECONDS))
assert entity.is_on is False


def test_sustained_low_voltage_trips_after_time_and_polls():
bad = [_battery(cells=[2.5] + [3.30] * 15)]
entity, coordinator = _entity(bad)

_poll(entity, coordinator, BASE)
assert entity.is_on is False # 1 poll, 0 s

_poll(entity, coordinator, BASE + timedelta(seconds=150))
assert entity.is_on is False # 2 polls, 150 s — below both thresholds

_poll(entity, coordinator, BASE + timedelta(seconds=DEBOUNCE_SECONDS))
# 3 polls AND >= 300 s elapsed
assert entity.is_on is True


def test_time_met_but_too_few_polls_does_not_trip():
bad = [_battery(cells=[2.5] + [3.30] * 15)]
entity, coordinator = _entity(bad)
# Two polls far apart: duration satisfied, poll count is not.
_poll(entity, coordinator, BASE)
_poll(entity, coordinator, BASE + timedelta(seconds=DEBOUNCE_SECONDS * 2))
assert DEBOUNCE_MIN_POLLS > 2
assert entity.is_on is False


def test_transient_excursion_clears_and_resets():
good = [_battery()]
bad = [_battery(cells=[2.5] + [3.30] * 15)]
entity, coordinator = _entity(good)

# Out of spec for two polls (mimicking the ~2-min dongle garbage), then back.
_poll(entity, coordinator, BASE, batteries=bad)
_poll(entity, coordinator, BASE + timedelta(seconds=150), batteries=bad)
_poll(entity, coordinator, BASE + timedelta(seconds=300), batteries=good)
assert entity.is_on is False
assert entity._offenders == {}

# A fresh excursion starts its counter from scratch.
_poll(entity, coordinator, BASE + timedelta(seconds=450), batteries=bad)
assert entity._offenders["BT1:cell_01_voltage"].poll_count == 1


def test_unpopulated_cell_reading_zero_is_ignored():
# Cell 16 unpopulated (~0 V) must not be read as an over-discharged cell.
batteries = [_battery(cells=[3.30] * 15 + [0.0])]
entity, coordinator = _entity(batteries)
for n in range(5):
_poll(entity, coordinator, BASE + timedelta(seconds=n * DEBOUNCE_SECONDS))
assert entity.is_on is False


def test_sustained_overtemperature_trips():
batteries = [_battery(temps=[60.0, 22.0, 22.0, 22.0])]
entity, coordinator = _entity(batteries)
for n in range(DEBOUNCE_MIN_POLLS):
_poll(entity, coordinator, BASE + timedelta(seconds=n * 150))
assert entity.is_on is True


def test_none_readings_are_skipped():
bat = _battery()
bat.v_cell_05 = None # a dropped read on one cell
bat.t_cells_05_08 = None
entity, coordinator = _entity([bat])
for n in range(5):
_poll(entity, coordinator, BASE + timedelta(seconds=n * DEBOUNCE_SECONDS))
assert entity.is_on is False

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Improvement: Add test for transient dropped reads

To ensure that transient dropped reads (None) do not reset the debounce state machine, we should add a test case verifying that the offender state and poll count are preserved when a None reading is encountered during an active excursion.

Suggested change
assert entity.is_on is False
assert entity.is_on is False
def test_transient_none_reading_does_not_reset_debounce():
bad = [_battery(cells=[2.5] + [3.30] * 15)]
dropped = [_battery(cells=[None] + [3.30] * 15)]
entity, coordinator = _entity(bad)
_poll(entity, coordinator, BASE)
assert entity._offenders["BT1:cell_01_voltage"].poll_count == 1
# A dropped read should preserve the offender and its poll count
_poll(entity, coordinator, BASE + timedelta(seconds=150), batteries=dropped)
assert "BT1:cell_01_voltage" in entity._offenders
assert entity._offenders["BT1:cell_01_voltage"].poll_count == 1
# The next out-of-spec read resumes the counter
_poll(entity, coordinator, BASE + timedelta(seconds=300), batteries=bad)
assert entity._offenders["BT1:cell_01_voltage"].poll_count == 2



def test_attributes_enumerate_current_offenders():
bad = [_battery(serial="BTX", cells=[2.5] + [3.30] * 15)]
entity, coordinator = _entity(bad)
_poll(entity, coordinator, BASE)
_poll(entity, coordinator, BASE + timedelta(seconds=120))
offenders = entity.extra_state_attributes["offenders"]
assert len(offenders) == 1
(offender,) = offenders
assert offender["battery"] == "BTX"
assert offender["metric"] == "cell_01_voltage"
assert offender["polls_out_of_spec"] == 2
assert offender["seconds_out_of_spec"] == 120


# ---------------------------------------------------------------------------
# Creation gating (integration-level)
# ---------------------------------------------------------------------------


def _entity_id(hass, unique_id: str) -> str | None:
return er.async_get(hass).async_get_entity_id("binary_sensor", DOMAIN, unique_id)


async def test_entity_created_and_off_for_in_spec_plant(hass, setup_integration):
entity_id = _entity_id(hass, "SA1234G123_battery_out_of_spec")
assert entity_id is not None
assert hass.states.get(entity_id).state == "off"


async def test_not_created_for_battery_less_plant(hass, mock_client, mock_plant, mock_config_entry):
mock_plant.batteries = []
mock_plant.number_batteries = 0
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert _entity_id(hass, "SA1234G123_battery_out_of_spec") is None
Loading