Skip to content

Commit

Permalink
(PR #40) Implement pacifist monitoring
Browse files Browse the repository at this point in the history
* Implement pacifist monitoring
* Update nitpick exceptions file
  • Loading branch information
edan-bainglass authored Jan 30, 2024
1 parent 8d49383 commit 455f07c
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 107 deletions.
28 changes: 24 additions & 4 deletions aiida_aurora/monitors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

from json import load
from tempfile import NamedTemporaryFile
from typing import Optional

from aiida.common.log import LOG_LEVEL_REPORT
from aiida.orm import CalcJobNode
from aiida.transports import Transport

Expand All @@ -13,7 +15,7 @@ def monitor_capacity_threshold(
transport: Transport,
settings: dict,
filename="snapshot.json",
) -> Optional[str]:
) -> str | None:
"""Retrieve and inspect snapshot to determine if capacity has
fallen below threshold for several consecutive cycles.
Expand Down Expand Up @@ -48,7 +50,6 @@ def monitor_capacity_threshold(
"""

analyzer = CapacityAnalyzer(**settings)
analyzer.set_logger(node.logger)

try:

Expand All @@ -72,7 +73,26 @@ def monitor_capacity_threshold(
if not snapshot:
raise ValueError

return analyzer.analyze(snapshot)
analyzer.analyze(snapshot)

node.base.extras.set_many({
"status": analyzer.status,
"snapshot": analyzer.snapshot,
})

node.logger.log(LOG_LEVEL_REPORT, analyzer.report)

if node.base.extras.get("marked_for_death", False):

node.base.extras.set("flag", "☠️")

if "snapshot" in node.base.extras:
node.base.extras.delete("snapshot")

return "Job terminated by monitor per user request"

if analyzer.flag:
node.base.extras.set("flag", f"🍅{analyzer.flag}")

except TypeError:
node.logger.error(f"'{filename}' not in dictionary format")
Expand Down
156 changes: 76 additions & 80 deletions aiida_aurora/utils/analyzers.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,14 @@
from itertools import groupby
from logging import LoggerAdapter
from typing import Optional
from __future__ import annotations

from aiida.common.log import AIIDA_LOGGER, LOG_LEVEL_REPORT
import numpy as np

from .parsers import get_data_from_raw


class Analyzer:
"""Base class for all analyzers.
"""Base class for all analyzers."""

Attributes
==========
`logger` : `Union[AiidaLoggerType, LoggerAdapter]`
The associated logger.
"""

logger = AIIDA_LOGGER.getChild("monitor")

def set_logger(self, logger: LoggerAdapter) -> None:
"""Set the analyzer logger.
Parameters
----------
`logger` : `LoggerAdapter`
The logger of the analyzed calculation node.
"""
self.logger = logger

def analyze(self, snapshot: dict) -> Optional[str]:
def analyze(self, snapshot: dict) -> None:
"""Analyze the experiment snapshot against a condition.
Condition is defined in subclass analyzers.
Expand All @@ -37,12 +17,6 @@ def analyze(self, snapshot: dict) -> Optional[str]:
----------
`snapshot` : `dict`
The loaded snapshot dictionary.
Returns
-------
`Optional[str]`
A string if a defined condition has been met,
`None` otherwise.
"""
raise NotImplementedError

Expand All @@ -67,6 +41,7 @@ def __init__(
check_type="discharge_capacity",
threshold=0.8,
consecutive_cycles=2,
keep_last=10,
) -> None:
"""`CapacityAnalyzer` constructor.
Expand All @@ -80,6 +55,8 @@ def __init__(
`consecutive_cycles` : `int`
The number of required consecutive cycles,
`2` by default.
`keep_last` : `int`
The number of cycles to keep in snapshot.
Raises
------
Expand All @@ -93,8 +70,13 @@ def __init__(
self.threshold = threshold
self.consecutive = consecutive_cycles
self.is_discharge = check_type == "discharge_capacity"
self.keep_last = keep_last

self.flag = ""
self.status = ""
self.report = ""

def analyze(self, snapshot: dict) -> Optional[str]:
def analyze(self, snapshot: dict) -> None:
"""Analyze the snapshot.
Check if capacity has fallen below threshold for required
Expand All @@ -104,84 +86,98 @@ def analyze(self, snapshot: dict) -> Optional[str]:
----------
`snapshot` : `dict`
The loaded snapshot dictionary.
Returns
-------
`Optional[str]`
If condition is met, an exit message, `None` otherwise.
"""
self.capacities = self._get_capacities(snapshot)
self.cycles = len(self.capacities)
return None if self.cycles < 1 else self._check_capacity()
self._extract_capacities(snapshot)
self._check_capacity()
self._truncate_snapshot()

###########
# PRIVATE #
###########

def _get_capacities(self, snapshot: dict):
def _extract_capacities(self, snapshot: dict) -> None:
"""Post-process the snapshot to extract capacities.
Parameters
----------
`snapshot` : `dict`
The loaded snapshot dictionary.
Returns
-------
`_type_`
A `numpy` array of capacities (in mAh), or empty list
if failed to process snapshot.
"""
try:
data = get_data_from_raw(snapshot)
return data['Qd'] if self.is_discharge else data['Qc']
self.snapshot = get_data_from_raw(snapshot)
self.capacities = self.snapshot["Qd"] \
if self.is_discharge \
else self.snapshot["Qc"]
except KeyError as err:
self.logger.error(f"missing '{str(err)}' in snapshot")
return []
self.report = f"missing '{str(err)}' in snapshot"
self.snapshot = {}
self.capacities = []

def _check_capacity(self) -> Optional[str]:
def _check_capacity(self) -> None:
"""Check if capacity has fallen below threshold for required
consecutive cycles.
consecutive cycles."""

Returns
-------
`Optional[str]`
If condition is met, an exit message, `None` otherwise.
"""
if (n := len(self.capacities)) < 2:
self.report = "need at least two complete cycles"
return

n = self.cycles
Qs = self.capacities[0]
Q = self.capacities[-1]
Q = self.capacities[-2]
Qt = self.threshold * Qs
C_per = Q / Qs * 100

message = f"cycle #{n} : {Q = :.2f} mAh ({Q / Qs * 100:.1f}%)"
self.report = f"cycle #{n} : {Q = :.2f} mAh ({C_per:.1f}%)"
self.status = f"(cycle #{n} : C @ {C_per:.1f}%)"

if Q < Qt:
message += f" : {(Qt - Q) / Qt * 100:.1f}% below threshold"
self.report += f" - {(Qt - Q) / Qt * 100:.1f}% below threshold"

below_threshold = np.where(self.capacities < Qt)[0] + 1
consecutively_below = self._filter_consecutive(below_threshold)

self.logger.log(LOG_LEVEL_REPORT, message)
if len(consecutively_below):

below_threshold = self._count_cycles_below_threshold()
if below_threshold >= self.consecutive:
return f"Capacity below threshold ({Qt:.2f} mAh) " \
f"for {below_threshold} cycles!"
cycles_str = str(consecutively_below).replace("'", "")
self.report += f" - cycles below threshold: {cycles_str}"

return None
if consecutively_below[-1] == n:
self.flag = "🔴"
else:
self.flag = "🟡"

def _count_cycles_below_threshold(self) -> int:
"""Count the number of consecutive cycles below threshold.
def _filter_consecutive(self, cycles: list[int]) -> list[int]:
"""Return cycles below threshold for `x` consecutive cycles.
Parameters
----------
`cycles` : `list[int]`
The cycles below threshold.
Returns
-------
`int`
The number of consecutive cycles below threshold.
`list[int]`
The cycles below threshold for `x` consecutive cycles.
"""
Qt = self.threshold * self.capacities[0]
return next(
(
len(list(group)) # cycle-count of first below-threshold group
for below, group in groupby(self.capacities < Qt)
if below
),
0,
)
return [
cycle for i, cycle in enumerate(cycles)
if i >= self.consecutive - 1 and \
all(cycles[i - j] == cycle - j for j in range(1, self.consecutive))
]

def _truncate_snapshot(self) -> None:
"""Truncate the snapshot to user defined size."""

truncated = {}

size = min(self.keep_last, len(self.snapshot["cycle-number"]))

for key, value in self.snapshot.items():

if key in ("time", "I", "Ewe", "Q"):
index = self.snapshot["cycle-index"][-size]
truncated[key] = value[index:]

elif key in ("cycle-number", "Qc", "Qd", "Ec", "Ed"):
truncated[key] = value[-size:]

self.snapshot = truncated
31 changes: 19 additions & 12 deletions aiida_aurora/utils/cycling_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json

import numpy as np
from pandas import DataFrame
from pandas.io.formats.style import Styler

Expand Down Expand Up @@ -278,9 +279,6 @@ def process_data(node: CalcJobNode) -> tuple[dict, str, Styler | str]:
Post-processed data, warning, and analysis | error message.
"""

if node.process_state and "finished" not in node.process_state.value:
return {}, f"Job terminated with message '{node.process_status}'", ""

warning = ""

if node.exit_status:
Expand All @@ -289,16 +287,19 @@ def process_data(node: CalcJobNode) -> tuple[dict, str, Styler | str]:
warning += f"{node.exit_message}" if node.exit_message else generic
warning += "\n\n"

if "results" in node.outputs:
data = get_data_from_results(node.outputs.results)
elif "raw_data" in node.outputs:
data = get_data_from_file(node.outputs.raw_data)
elif "retrieved" in node.outputs:
data = get_data_from_file(node.outputs.retrieved)
elif "remote_folder" in node.outputs:
data = get_data_from_remote(node.outputs.remote_folder)
if node.exit_status is None:
data = get_data_from_snapshot(node.base.extras.get("snapshot", {}))
else:
data = {}
if "results" in node.outputs:
data = get_data_from_results(node.outputs.results)
elif "raw_data" in node.outputs:
data = get_data_from_file(node.outputs.raw_data)
elif "retrieved" in node.outputs:
data = get_data_from_file(node.outputs.retrieved)
elif "remote_folder" in node.outputs:
data = get_data_from_remote(node.outputs.remote_folder)
else:
data = {}

return data, warning, add_analysis(data)

Expand Down Expand Up @@ -346,6 +347,11 @@ def get_data_from_remote(source: RemoteData) -> dict:
return {}


def get_data_from_snapshot(snapshot: dict) -> dict:
"""docstring"""
return {k: np.array(v) for k, v in snapshot.items()}


def add_analysis(data: dict) -> Styler | str:
"""Return analysis details.
Expand Down Expand Up @@ -381,4 +387,5 @@ def add_analysis(data: dict) -> Styler | str:
]).hide(axis="index")

else:

return "ERROR! Failed to find or parse output"
Loading

0 comments on commit 455f07c

Please sign in to comment.