From 7e92212f57ce2613be567420b72232aecd53f4bd Mon Sep 17 00:00:00 2001 From: ArticOdin Date: Fri, 22 May 2026 10:41:28 +1000 Subject: [PATCH] test(tariff): Hypothesis fuzz the five tariff_engine invariants (Phase 11 PR-18) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-18 — Final v2.0 GA plank. Property-based tests cover the five invariants from v2 research § 7.3 for tariff_engine pure functions (calc_stepped_cost, get_stepped_import_rate, get_current_tou_period). - 1. Monotonic stepped cost: cost(k1) <= cost(k2) for k1 <= k2. - 2. Threshold equality: cost(threshold) == threshold * step1_rate. - 3. Step composition above threshold: cost = step1_cost + (k - threshold) * step2_rate. - 4. Stepped rate dichotomy: returned rate is exactly step1 or step2. - 5. TOU period closure: returned period name is in the supplied dict OR "unknown"; rate matches the period when found; full-day coverage produces no "unknown". ≥ 200 fuzzed examples per invariant via hypothesis. 9 test classes. 1054 total pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 2 + tests/test_tariff_engine_hypothesis.py | 222 +++++++++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 tests/test_tariff_engine_hypothesis.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c4c3cc..4a9b511 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ### Added +- Hypothesis property-based tests of `tariff_engine` pure functions. Five invariants per v2 research § 7.3: (1) `calc_stepped_cost` is monotonic-non-decreasing in kWh; (2) at threshold it equals `threshold * step1_rate` exactly; (3) above threshold it composes as `step1_cost + (k - threshold) * step2_rate`; (4) `get_stepped_import_rate` returns exactly one of `step1_rate` / `step2_rate`; (5) `get_current_tou_period` returns a known period name or `"unknown"`, with rate matching the period. 9 Hypothesis test classes; ≥200 fuzzed examples per invariant. Final plank toward v3.0 GA. (Phase 11 / PR-18) + - HACS validation job in CI. The existing `Validation` workflow now also runs `hacs/action@main` with `category: integration` on every push + PR. Hassfest job stays as-is — both validators run side-by-side. Catches HACS distribution issues (manifest schema drift, brands gaps, version bump misses) before merge. (Phase 11 / PR-17) - HA test-harness fixture prototypes (`tests/ha_fixtures.py`). Drop-in mocks for `OpenElectricityPriceSource`, `NEMWebPriceSource`, `async_add_external_statistics`, plus a `mock_config_entry_data` factory for DWT-OE entries. NOT auto-applied — the existing 1028 stub-conftest tests stay HA-free per D-P11-1 (dual-mode test strategy). New tests opt in by importing. `pytest-homeassistant-custom-component>=0.13.0` + `hypothesis>=6.100.0` added to `requirements.txt` for the new harness + Hypothesis fuzzing tests. 10 smoke tests cover the fixture shapes. (Phase 11 / PR-16) diff --git a/tests/test_tariff_engine_hypothesis.py b/tests/test_tariff_engine_hypothesis.py new file mode 100644 index 0000000..8ec6fbe --- /dev/null +++ b/tests/test_tariff_engine_hypothesis.py @@ -0,0 +1,222 @@ +"""Phase 11 PR-18 — Hypothesis fuzzing of tariff_engine pure functions. + +Five invariants per v2 research § 7.3: + +1. **Monotonic stepped cost**: ``calc_stepped_cost(t, k)`` is + non-decreasing in ``k`` for fixed tariff. +2. **Threshold equality**: at ``k == threshold``, + ``calc_stepped_cost`` equals ``threshold * step1_rate`` exactly. +3. **Step composition**: for ``k > threshold``, + ``calc_stepped_cost = step1_cost + (k - threshold) * step2_rate``. +4. **Stepped rate dichotomy**: ``get_stepped_import_rate`` returns + exactly one of ``step1_rate`` or ``step2_rate``. +5. **TOU period closure**: ``get_current_tou_period`` returns a + period name that's either in the supplied dict OR ``"unknown"`` + (never something else); and the rate matches the period's rate + when found. +""" + +from __future__ import annotations + +from datetime import datetime, timezone + +from hypothesis import given, settings, strategies as st + +from custom_components.pricehawk.tariff_engine import ( + calc_stepped_cost, + get_current_tou_period, + get_stepped_import_rate, +) + + +# Bounded strategies — real-world tariff rates are c/kWh in 0..200 range, +# consumption 0..200 kWh/day (a heavy household), thresholds 0.01..50. +_kwh = st.floats( + min_value=0.0, max_value=200.0, allow_nan=False, allow_infinity=False, +) +_rate = st.floats( + min_value=0.0, max_value=200.0, allow_nan=False, allow_infinity=False, +) +# Threshold floor: 0.01 (zero threshold is a degenerate edge case +# covered by an explicit test elsewhere; the production code guards +# against it but Hypothesis doesn't need to re-explore it). +_threshold = st.floats( + min_value=0.01, max_value=50.0, allow_nan=False, allow_infinity=False, +) + + +def _tariff(threshold: float, step1_rate: float, step2_rate: float) -> dict: + return { + "step1_threshold_kwh": threshold, + "step1_rate": step1_rate, + "step2_rate": step2_rate, + } + + +# ---------------------------------------------------------------------- +# Invariant 1: Monotonic stepped cost +# ---------------------------------------------------------------------- + + +class TestStepCostMonotonic: + @given(_threshold, _rate, _rate, _kwh, _kwh) + @settings(max_examples=200, deadline=None) + def test_cost_monotonic_in_kwh( + self, threshold, step1, step2, k1, k2, + ): + """Sort k1 <= k2; cost(t, k1) <= cost(t, k2).""" + lo, hi = (k1, k2) if k1 <= k2 else (k2, k1) + tariff = _tariff(threshold, step1, step2) + cost_lo = calc_stepped_cost(tariff, lo) + cost_hi = calc_stepped_cost(tariff, hi) + assert cost_lo <= cost_hi + 1e-9, ( + f"Non-monotonic: cost({lo})={cost_lo} > cost({hi})={cost_hi}" + ) + + +# ---------------------------------------------------------------------- +# Invariant 2: Threshold equality +# ---------------------------------------------------------------------- + + +class TestStepCostAtThreshold: + @given(_threshold, _rate, _rate) + @settings(max_examples=100, deadline=None) + def test_cost_at_threshold_uses_only_step1( + self, threshold, step1, step2, + ): + tariff = _tariff(threshold, step1, step2) + result = calc_stepped_cost(tariff, threshold) + expected = threshold * step1 + # Allow for float rounding tolerance. + assert abs(result - expected) < 1e-9, ( + f"At threshold {threshold} with step1={step1}, " + f"expected {expected}, got {result}" + ) + + +# ---------------------------------------------------------------------- +# Invariant 3: Step composition above threshold +# ---------------------------------------------------------------------- + + +class TestStepCostAboveThreshold: + @given(_threshold, _rate, _rate, _kwh) + @settings(max_examples=200, deadline=None) + def test_above_threshold_step_composition( + self, threshold, step1, step2, k, + ): + if k <= threshold: + return # Different invariant covers ≤ threshold. + tariff = _tariff(threshold, step1, step2) + result = calc_stepped_cost(tariff, k) + expected = threshold * step1 + (k - threshold) * step2 + assert abs(result - expected) < 1e-6, ( + f"At kwh={k}, threshold={threshold}, step1={step1}, step2={step2}: " + f"expected {expected}, got {result}" + ) + + +# ---------------------------------------------------------------------- +# Invariant 4: Stepped rate dichotomy +# ---------------------------------------------------------------------- + + +class TestSteppedRateDichotomy: + @given(_threshold, _rate, _rate, _kwh) + @settings(max_examples=200, deadline=None) + def test_returned_rate_is_one_of_steps( + self, threshold, step1, step2, k, + ): + tariff = _tariff(threshold, step1, step2) + rate = get_stepped_import_rate(tariff, k) + assert rate in (step1, step2), ( + f"get_stepped_import_rate({k}, threshold={threshold}) returned " + f"{rate} — must be step1={step1} or step2={step2}" + ) + + @given(_threshold, _rate, _rate, _kwh) + @settings(max_examples=200, deadline=None) + def test_below_threshold_returns_step1( + self, threshold, step1, step2, k, + ): + if k >= threshold: + return + tariff = _tariff(threshold, step1, step2) + assert get_stepped_import_rate(tariff, k) == step1 + + @given(_threshold, _rate, _rate, _kwh) + @settings(max_examples=200, deadline=None) + def test_at_or_above_threshold_returns_step2( + self, threshold, step1, step2, k, + ): + if k < threshold: + return + tariff = _tariff(threshold, step1, step2) + assert get_stepped_import_rate(tariff, k) == step2 + + +# ---------------------------------------------------------------------- +# Invariant 5: TOU period closure +# ---------------------------------------------------------------------- + + +def _basic_tou_periods() -> dict: + """Three-window day covering 24h with no gaps for fuzz tests.""" + return { + "peak": { + "rate": 39.6, + "windows": [["16:00", "21:00"]], + }, + "shoulder": { + "rate": 27.5, + "windows": [["07:00", "16:00"], ["21:00", "23:00"]], + }, + "offpeak": { + "rate": 11.0, + "windows": [["23:00", "07:00"]], # midnight-crossing + }, + } + + +class TestTOUPeriodClosure: + @given( + hour=st.integers(min_value=0, max_value=23), + minute=st.integers(min_value=0, max_value=59), + ) + @settings(max_examples=200, deadline=None) + def test_returns_known_period_or_unknown(self, hour, minute): + periods = _basic_tou_periods() + now = datetime(2026, 5, 22, hour, minute, tzinfo=timezone.utc) + name, rate = get_current_tou_period(periods, now) + assert name in periods or name == "unknown", ( + f"period name {name!r} not in {list(periods)} and not 'unknown'" + ) + + @given( + hour=st.integers(min_value=0, max_value=23), + minute=st.integers(min_value=0, max_value=59), + ) + @settings(max_examples=200, deadline=None) + def test_returned_rate_matches_period(self, hour, minute): + periods = _basic_tou_periods() + now = datetime(2026, 5, 22, hour, minute, tzinfo=timezone.utc) + name, rate = get_current_tou_period(periods, now) + if name in periods: + assert rate == periods[name]["rate"] + else: + assert rate == 0.0 + + @given( + hour=st.integers(min_value=0, max_value=23), + minute=st.integers(min_value=0, max_value=59), + ) + @settings(max_examples=200, deadline=None) + def test_full_day_coverage_no_unknown(self, hour, minute): + """With the basic 24h-covering periods, no minute returns 'unknown'.""" + periods = _basic_tou_periods() + now = datetime(2026, 5, 22, hour, minute, tzinfo=timezone.utc) + name, _ = get_current_tou_period(periods, now) + assert name != "unknown", ( + f"hour={hour:02d}:{minute:02d} fell through period coverage" + )