Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions custom_components/pricehawk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
# Schedule periodic state persistence
coordinator.schedule_persist()

# Phase 3.1 — schedule daily multi-plan ranking job at 00:30 local.
# First run also fires immediately so the alternatives sensor isn't
# empty until midnight on a fresh install.
coordinator.schedule_daily_ranking()
hass.async_create_task(coordinator.async_run_ranking_job())

# Copy www assets (icon + HTML) and register sidebar panel
await copy_www_assets(hass)
await setup_panel_iframe(hass, entry)
Expand Down Expand Up @@ -179,6 +185,34 @@ async def handle_backfill(call: object) -> None:

hass.services.async_register(DOMAIN, "backfill_history", handle_backfill)

# Phase 3.1 commit 5 — manual ranking trigger. Lets users force-run
# the ranking pipeline from Developer Tools → Services without
# waiting for the next 00:30 schedule fire. Most useful right after
# switching plans (so the alternatives ranking reflects the new
# distributor / postcode immediately).
async def handle_rank_alternatives(call: object) -> None:
# CR-fix: malformed service payload (e.g. ``top_k: "abc"`` from
# a typo in a YAML automation) would raise ValueError/TypeError
# and fail the call. Coerce defensively + fall back to default.
raw = call.data.get("top_k", 20) # type: ignore[attr-defined]
try:
top_k = int(raw)
except (TypeError, ValueError):
_LOGGER.warning(
Comment on lines +193 to +201

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Avoid duplicating the default top_k value; reuse the shared constant to prevent drift.

20 is hardcoded in the default for call.data.get, in the warning text, and implicitly as the clamp bound. Prefer a single source of truth: either import DEFAULT_TOP_K from the ranking layer and use it in all three places, or define a module-level constant here and reference it everywhere. That avoids divergence if the default changes elsewhere.

Suggested implementation:

    RANK_ALTERNATIVES_DEFAULT_TOP_K = 20

    # Phase 3.1 commit 5 — manual ranking trigger. Lets users force-run
    # the ranking pipeline from Developer Tools → Services without
    # waiting for the next 00:30 schedule fire. Most useful right after
    # switching plans (so the alternatives ranking reflects the new
    # distributor / postcode immediately).
    async def handle_rank_alternatives(call: object) -> None:
        # CR-fix: malformed service payload (e.g. ``top_k: "abc"`` from
        # a typo in a YAML automation) would raise ValueError/TypeError
        # and fail the call. Coerce defensively + fall back to default.
        raw = call.data.get("top_k", RANK_ALTERNATIVES_DEFAULT_TOP_K)  # type: ignore[attr-defined]
        try:
            top_k = int(raw)
        except (TypeError, ValueError):
            _LOGGER.warning(
                "rank_alternatives: invalid top_k=%r, using default %d",
                raw,
                RANK_ALTERNATIVES_DEFAULT_TOP_K,
            )
            top_k = RANK_ALTERNATIVES_DEFAULT_TOP_K
        top_k = max(1, min(top_k, 100))
        result = await coordinator.async_run_ranking_job(top_k=top_k)
        _LOGGER.info(
            "rank_alternatives service: ran successfully, %d result(s)",
            len(result),
        )

If there is already a shared default (e.g. DEFAULT_TOP_K) in your ranking layer, you may prefer to:

  1. Import that constant at the top of this file.
  2. Replace RANK_ALTERNATIVES_DEFAULT_TOP_K with the imported constant name.
    This preserves a single source of truth across modules.

"rank_alternatives: invalid top_k=%r, using default 20", raw
)
top_k = 20
top_k = max(1, min(top_k, 100))
result = await coordinator.async_run_ranking_job(top_k=top_k)
_LOGGER.info(
"rank_alternatives service: ran successfully, %d result(s)",
len(result),
)

hass.services.async_register(
DOMAIN, "rank_alternatives", handle_rank_alternatives
)

_LOGGER.info("PriceHawk integration setup complete")
return True

Expand All @@ -191,6 +225,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
)
if coordinator:
coordinator.cancel_persist()
coordinator.cancel_ranking()
await coordinator.async_persist_state()

await remove_panel(hass)
Expand All @@ -199,5 +234,6 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
if not hass.data.get(DOMAIN):
hass.services.async_remove(DOMAIN, "analyze_csv")
hass.services.async_remove(DOMAIN, "backfill_history")
hass.services.async_remove(DOMAIN, "rank_alternatives")

return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
169 changes: 169 additions & 0 deletions custom_components/pricehawk/cdr/ranking_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""Phase 3.1 — Coordinator-facing daily ranking job orchestration.

Lives outside ``coordinator.py`` so the pure logic is unit-testable
without the HA app context that ``PriceHawkCoordinator`` requires
(``DataUpdateCoordinator[T]`` parameterised bases don't survive the
mock-based conftest).

Each function takes the inputs it needs explicitly — config-entry
options, the registry, a HTTP session. The coordinator-side wrapper
methods (in ``coordinator.py``) own the side effects (scheduling
callbacks, persisting results, swallowing exceptions across the daily
boundary).
"""
from __future__ import annotations

import logging
from typing import Any, TYPE_CHECKING

from .ranking import DEFAULT_TOP_K, rank_alternatives
from .registry import RetailerEndpoint, find_by_brand, get_registry

if TYPE_CHECKING:
import aiohttp

_LOGGER = logging.getLogger(__name__)

# Big-4 nationally-active retailers scanned on every daily run.
# EME refdata2 doesn't carry per-retailer geography, so we always
# attempt these four; ``rank_alternatives`` then filters their plans
# by the user's postcode/distributor anyway.
DEFAULT_COMPETITOR_BRAND_FRAGMENTS: tuple[str, ...] = (
"agl",
"origin",
"energyaustralia",
"red energy",
)


def get_user_geography(
options: dict[str, Any],
) -> tuple[str | None, str | None, str | None]:
"""Pull ``(state, postcode, distributor)`` from a config_entry's options.

- ``postcode``: ``cdr_postcode`` option (set by the wizard).
- ``distributor``: first entry in ``cdr_plan.data.geography.distributors``.
The user already accepted this plan so its distributor IS theirs.
- ``state``: returned as ``None`` — derived later in the registry
filter when needed. Postcode + distributor is more precise.
"""
postcode = options.get("cdr_postcode") or None
# CR-fix: every level guarded with isinstance — malformed payloads
# can ship ``cdr_plan`` as a string, ``data`` as a list, ``geography``
# as None, etc. Without guards, ``.get()`` / ``.strip()`` raise
# AttributeError and abort the whole ranking run.
plan_data = _safe_plan_data(options)
geo = plan_data.get("geography") or {}
if not isinstance(geo, dict):
return None, postcode, None
distributors = geo.get("distributors")
distributor = (
distributors[0]
if isinstance(distributors, list)
and distributors
and isinstance(distributors[0], str)
else None
)
return None, postcode, distributor


def _safe_plan_data(options: dict[str, Any]) -> dict[str, Any]:
"""Pull ``cdr_plan.data`` safely. Returns ``{}`` on any malformed shape.

Tolerated malformations: ``cdr_plan`` missing / non-dict, ``data``
missing / non-dict. Used by both ``get_user_geography`` and
``get_competitor_retailers``.
"""
cdr_plan = options.get("cdr_plan")
if not isinstance(cdr_plan, dict):
return {}
plan_data = cdr_plan.get("data")
return plan_data if isinstance(plan_data, dict) else {}


async def get_competitor_retailers(
session: aiohttp.ClientSession,
options: dict[str, Any],
*,
competitor_fragments: tuple[str, ...] = DEFAULT_COMPETITOR_BRAND_FRAGMENTS,
) -> list[RetailerEndpoint]:
"""Build the retailer list scanned during the daily ranking job.

Composition (in priority order, dedup by ``brand_id``):
1. User's CURRENT retailer (from ``cdr_plan.data.brand``).
Comment on lines +84 to +93

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Handle the case where the current brand exists but does not resolve to a registry entry more explicitly.

If current_brand is set but find_by_brand returns no match, we silently ignore it and only use the hardcoded competitors. Consider either logging this condition (debug/info) or adding a more forgiving lookup (e.g., case/whitespace normalization) so it’s easier to detect when the registry and stored plan are out of sync.

Suggested implementation:

    current_brand = plan_data.get("brand")

    current_retailer: RetailerEndpoint | None = None
    if current_brand:
        # First, try the exact lookup used previously
        current_retailer = registry.find_by_brand(current_brand)

        if not current_retailer:
            # Fallback: trim whitespace and do a case-insensitive comparison against
            # known registry brands to recover from minor formatting drift.
            normalized_brand = current_brand.strip().lower()
            for retailer in registry:
                try:
                    retailer_brand = retailer.brand
                except AttributeError:
                    # Be defensive if registry entries are not uniform
                    continue

                if retailer_brand and retailer_brand.strip().lower() == normalized_brand:
                    current_retailer = retailer
                    break

        if not current_retailer:
            # Explicitly log when the stored plan refers to a brand that we cannot
            # resolve in the registry so configuration drift is discoverable.
            _LOGGER.info(
                "CDR ranking: current brand %r from plan did not match any registry entry",
                current_brand,
            )

I assumed the existence of:

  1. A registry object that is iterable and whose items have a .brand attribute.
  2. A module-level _LOGGER logger instance.

To integrate this cleanly, you should:

  1. Ensure registry is iterable and that its items expose a brand attribute (or adjust the loop to match your actual type, e.g., registry.retailers or registry.values()).
  2. Confirm _LOGGER is defined in this module (typical Home Assistant pattern: import logging and _LOGGER = logging.getLogger(__name__)). If it is named differently, update the logging call accordingly.
  3. If RetailerEndpoint is not the correct type for current_retailer, adjust the annotation accordingly or remove it to match your existing conventions.

2. The hardcoded big-4 competitors.

Falls back to baked-in registry via ``get_registry``'s own fallback
when live fetch fails. Returns ``[]`` if registry is empty (edge
case; baked-in always has 100+ entries).
"""
endpoints, source = await get_registry(session)
_LOGGER.debug(
"ranking: registry source=%s, %d retailers", source, len(endpoints)
)

out: list[RetailerEndpoint] = []
seen_brand_ids: set[str] = set()

plan_data = _safe_plan_data(options)
raw_brand = plan_data.get("brand")
# ``brand`` is sometimes shipped as None or non-string by retailers;
# only accept str to keep ``.strip()`` and ``find_by_brand`` safe.
current_brand = raw_brand.strip() if isinstance(raw_brand, str) else ""
if current_brand:
current = find_by_brand(endpoints, current_brand)
if current is not None:
out.append(current)
seen_brand_ids.add(current.brand_id)

for fragment in competitor_fragments:
match = find_by_brand(endpoints, fragment)
if match is None or match.brand_id in seen_brand_ids:
continue
out.append(match)
seen_brand_ids.add(match.brand_id)

return out


async def run_ranking_job(
session: aiohttp.ClientSession,
options: dict[str, Any],
*,
top_k: int = DEFAULT_TOP_K,
plan_cache: dict[str, dict[str, Any]] | None = None,
competitor_fragments: tuple[str, ...] = DEFAULT_COMPETITOR_BRAND_FRAGMENTS,
) -> list[dict[str, Any]]:
"""Run the cheap-rank pipeline. Returns the top-K plans.

Cheap-rank only for now. Deep-rank (consumption replay) joins in
Phase 3.2 when the universal HA-history backfill ships and we
have real per-slot consumption to rank against.

Caller (coordinator) is responsible for:
- Scheduling (``async_track_time_change``).
- Persisting the returned list onto coordinator state.
- Catching exceptions across the daily boundary (this function
only catches its own — ``rank_alternatives``'s exception
isolation per retailer).

Returns ``[]`` if no retailers resolved (e.g. registry empty).
"""
retailers = await get_competitor_retailers(
session, options, competitor_fragments=competitor_fragments
)
if not retailers:
_LOGGER.info("ranking: no competitor retailers resolved; skipping")
return []

_state, postcode, distributor = get_user_geography(options)

return await rank_alternatives(
session,
retailers,
state=_state,
postcode=postcode,
distributor=distributor,
top_k=top_k,
cache=plan_cache,
)
117 changes: 115 additions & 2 deletions custom_components/pricehawk/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

import logging
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta
from typing import Any

import aiohttp
Expand All @@ -13,7 +13,7 @@
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.event import async_call_later
from homeassistant.helpers.event import async_call_later, async_track_time_change
from homeassistant.helpers.storage import Store
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.util import dt as dt_util
Expand Down Expand Up @@ -46,6 +46,8 @@
CONF_LOCALVOLTS_PARTNER_ID,
LOCALVOLTS_API_POLL_INTERVAL,
)
from .cdr.ranking import DEFAULT_TOP_K
from .cdr.ranking_job import run_ranking_job
from .explanation import build_explanation
from .localvolts_api import aggregate_to_half_hour, fetch_recent_intervals
from .providers.cdr_plan import CdrPlanProvider
Expand All @@ -62,6 +64,13 @@
_MAX_RETRIES = 3
_RETRY_BASE_DELAY = 2 # seconds, doubles each attempt

# Daily ranking job runs at this local time. 00:30 is after midnight
# rollover so today's daily_cost_history is already final, and well
# before users' morning HA dashboards refresh. Competitor retailer
# list lives in ``cdr.ranking_job`` so it stays testable without HA.
_RANKING_RUN_HOUR = 0
_RANKING_RUN_MINUTE = 30


def _extract_peak_rate_c_inc_gst(cdr_plan: dict[str, Any] | None) -> float | None:
"""Phase 3.0e — pull PEAK rate from a CDR plan envelope.
Expand Down Expand Up @@ -244,6 +253,28 @@ def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
self._store = Store(hass, STORAGE_VERSION, STORAGE_KEY)
self._persist_unsub: CALLBACK_TYPE | None = None

# Phase 3.1 — multi-plan ranking. Top-K cheaper alternatives
# populated by the daily 00:30 ranking job; consumed by the
# ranked-alternatives sensor (Phase 3.1 commit 6) and HA service
# (commit 5). Deep-rank (consumption replay through evaluator)
# is deferred to Phase 3.2 when HA-history backfill exists —
# without enough recorded slots, deep-rank has no signal.
self._cheap_ranked_alternatives: list[dict[str, Any]] = []
self._ranking_last_run_at: datetime | None = None
# Plan-detail cache reused across same-day runs so a manual
# rerun via the rank_alternatives service skips re-fetching
# plans already pulled by the morning scheduled run. Cache
# clears on the FIRST run of a new calendar day so overnight
# republished plans get refreshed.
self._ranking_plan_cache: dict[str, dict[str, Any]] = {}
self._ranking_cache_date: date | None = None
self._ranking_unsub: CALLBACK_TYPE | None = None
# CR-fix: scheduled callback + manual service trigger can both
# call async_run_ranking_job concurrently. A second concurrent
# entry would interleave _ranking_plan_cache mutations and
# duplicate every expensive CDR detail fetch. Lock serialises.
self._ranking_lock = asyncio.Lock()

# ------------------------------------------------------------------
# Amber REST API polling
# ------------------------------------------------------------------
Expand Down Expand Up @@ -1157,6 +1188,88 @@ def cancel_persist(self) -> None:
self._persist_unsub()
self._persist_unsub = None

# ------------------------------------------------------------------
# Phase 3.1 — daily multi-plan ranking job
# ------------------------------------------------------------------

def schedule_daily_ranking(self) -> None:
"""Register the 00:30 local-time daily ranking job.

Uses ``async_track_time_change`` so the callback fires regardless
of the integration's 30s update tick. Safe to call twice; the
second call replaces the first (no double-schedule).
"""
self.cancel_ranking()

async def _ranking_callback(_now: datetime) -> None:
await self.async_run_ranking_job()

self._ranking_unsub = async_track_time_change(
self.hass,
_ranking_callback,
hour=_RANKING_RUN_HOUR,
minute=_RANKING_RUN_MINUTE,
second=0,
)

def cancel_ranking(self) -> None:
"""Cancel the scheduled daily ranking callback."""
if self._ranking_unsub is not None:
self._ranking_unsub()
self._ranking_unsub = None

async def async_run_ranking_job(
self, *, top_k: int = DEFAULT_TOP_K
) -> list[dict[str, Any]]:
"""Run the daily ranking pipeline. Returns the persisted top-K.

Called from the scheduled callback at 00:30 local, and also from
the future ``pricehawk.rank_alternatives`` HA service (Phase 3.1
commit 5) on user request. Idempotent: re-runs use the per-plan
cache so unchanged plans skip re-fetching.

Thin wrapper around ``cdr.ranking_job.run_ranking_job``: this
method owns HA-side side effects (session, exception
swallowing, state persistence) while the pure logic stays
unit-testable without HA's app context.
"""
# Serialise to prevent overlapping runs (scheduled callback +
# manual service trigger). Second caller blocks briefly then
# returns freshly populated results from the cache.
async with self._ranking_lock:
# Date-rollover cache reset BEFORE the run, not after.
# Keeps same-day reruns warm; new local-day run starts
# from empty cache so overnight republished plans get
# fresh data.
today = dt_util.now().date()
if self._ranking_cache_date != today:
self._ranking_plan_cache.clear()
self._ranking_cache_date = today

session = async_get_clientsession(self.hass)
try:
ranked = await run_ranking_job(
session,
dict(self.config_entry.options),
top_k=top_k,
plan_cache=self._ranking_plan_cache,
)
except Exception: # noqa: BLE001 — daily job must not raise
_LOGGER.exception("ranking: pipeline raised; keeping prior results")
return self._cheap_ranked_alternatives

if ranked:
# Only overwrite prior results when the run actually
# produced something. An empty list usually means "no
# retailers resolved" or "all retailers down" — both
# transient; better to keep yesterday's ranking.
self._cheap_ranked_alternatives = ranked
self._ranking_last_run_at = dt_util.now()
_LOGGER.info(
"ranking: persisted %d alternative(s)", len(ranked),
)
return ranked or self._cheap_ranked_alternatives

# ------------------------------------------------------------------
# Options update / engine rebuild
# ------------------------------------------------------------------
Expand Down
Loading
Loading