From 40f99968a5250b8228c9a27f9d01e7b3ac6b712b Mon Sep 17 00:00:00 2001 From: Mikel Eguiraun Date: Wed, 3 May 2023 14:56:39 +0200 Subject: [PATCH] MAXIV: new common HWO for machine information New MachInfo hardware object, for using at BioMAX and MicroMAX. Based on older HWO used at BioMAX, but updated to follow the new AbstractMachineInfo API. --- .coveragerc | 1 - mxcubecore/HardwareObjects/MAXIV/MachInfo.py | 206 ++++++++----------- test/pytest/test_hwo_maxiv_mach_info.py | 115 +++++++++++ 3 files changed, 196 insertions(+), 126 deletions(-) mode change 100644 => 100755 mxcubecore/HardwareObjects/MAXIV/MachInfo.py create mode 100644 test/pytest/test_hwo_maxiv_mach_info.py diff --git a/.coveragerc b/.coveragerc index 4bf2e797e8..a3b10b125f 100644 --- a/.coveragerc +++ b/.coveragerc @@ -7,5 +7,4 @@ omit = mxcubecore/HardwareObjects/EMBL/* mxcubecore/HardwareObjects/ESRF/* mxcubecore/HardwareObjects/LNLS/* - mxcubecore/HardwareObjects/MAXIV/* mxcubecore/HardwareObjects/SOLEIL/* diff --git a/mxcubecore/HardwareObjects/MAXIV/MachInfo.py b/mxcubecore/HardwareObjects/MAXIV/MachInfo.py old mode 100644 new mode 100755 index ab3701c258..ad603aecf7 --- a/mxcubecore/HardwareObjects/MAXIV/MachInfo.py +++ b/mxcubecore/HardwareObjects/MAXIV/MachInfo.py @@ -1,153 +1,109 @@ -""" -[Name] MachInfoMockup +import re +import math +import logging +import gevent +from tango import DeviceProxy +from mxcubecore.utils.units import sec_to_hour, A_to_mA +from mxcubecore.HardwareObjects.abstract.AbstractMachineInfo import AbstractMachineInfo -[Description] -MachInfo hardware objects are used to obtain information from the accelerator -control system. +# how often we refresh machine info +REFRESH_PERIOD_SEC = 30 +CLEANR = re.compile("<.*?>") -This is a mockup hardware object, it simulates the behaviour of an accelerator -information by : - - produces a current value that varies with time - - simulates a control room message that changes with some condition - () - - simulates +log = logging.getLogger("HWR") -[Emited signals] -machInfoChanged - pars: values (dict) - mandatory fields: - values['current'] type: str; desc: synchrotron radiation current in milli-amps - values['message'] type: str; desc: message from control room - values['attention'] type: boolean; desc: False (if no special attention is required) - True (if attention should be raised to the user) +def cleanhtml(raw_html): + cleantext = re.sub(CLEANR, "", raw_html) + return cleantext - optional fields: - any number of optional fields can be sent over with this signal by adding them in the - values dictionary - for example: - values['lifetime'] - values['topup_remaining'] -""" +def catch_errors(func): + """ + run wrapped function, catching all exception -import logging -import gevent -import time -import PyTango -from mxcubecore import HardwareRepository as HWR -from mxcubecore.BaseHardwareObjects import HardwareObject + If an exception as raised, log the exception and return 'unknown' + """ + def wrapper(*a, **kw): + try: + return func(*a, **kw) + except Exception: + log.exception("error fetching machine info") + return "unknown" -class MachInfo(HardwareObject): - default_current = 0 - default_lifetime = 0 - default_message = "" - default_topup_remaining = 0 + return wrapper - def __init__(self, *args): - Equipment.__init__(self, *args) - default_current = 0 - default_lifetime = 0 - default_message = "" - default_topup_remaining = 0 - self.current = self.default_current - self.lifetime = self.default_lifetime - self.message = self.default_message - - self.mach_info_channel = None - self.mach_curr_channel = None - def init(self): - try: - # self.mach_info_channel = self.get_channel_object("mach_info") - channel = self.get_property("mach_info") - self.mach_info_channel = PyTango.DeviceProxy(channel) - self.message = self.mach_info_channel.OperatorMessage - self.message += "\n" + self.mach_info_channel.R3NextInjection - except Exception as ex: - logging.getLogger("HWR").warning("Error initializing machine info channel") +class MachInfo(AbstractMachineInfo): + """ + Machine info hardware object for MAXIV site. - try: - # self.curr_info_channel = self.get_channel_object("curr_info") - channel_current = self.get_property("current") - self.curr_info_channel = PyTango.DeviceProxy(channel_current) - # why twice?? - # why hwr channel does not work?? why?? - if self.curr_info_channel is None: - self.curr_info_channel = PyTango.DeviceProxy(channel_current) - curr = self.curr_info_channel.Current - if curr < 0: - self.current = 0.00 - else: - self.current = "{:.2f}".format(curr * 1000) - self.lifetime = float( - "{:.2f}".format(self.curr_info_channel.Lifetime / 3600) - ) - except Exception as ex: - logging.getLogger("HWR").warning("Error initializing current info channel") - - self._run() - - def _run(self): - gevent.spawn(self._update_me) - - def _update_me(self): - self.t0 = time.time() + Provides to the user general information about the machine, + such as ring status, current, operator message, etc. - while True: - gevent.sleep(2) - self.message = self.mach_info_channel.OperatorMessage - self.message += "\n" + self.mach_info_channel.R3NextInjection - curr = self.curr_info_channel.Current - if curr < 0: - self.current = 0.00 - else: - self.current = "{:.2f}".format(curr * 1000) + This hardware objects fetches the information from attributes + of specified tango devices. - self.lifetime = float( - "{:.2f}".format(self.curr_info_channel.Lifetime / 3600) - ) + Hardware object properties: + mach_info (str): name of the machine status tango device + current (str): name of the ring status tango device + parameters (str): topics to export, see AbstractMachineInfo class for details + """ - self.attention = False - values = dict() - values["current"] = self.current - values["message"] = self.message - values["lifetime"] = self.lifetime - values["attention"] = self.attention - self.emit("machInfoChanged", values) - self.emit("valueChanged", values) + def __init__(self, *args): + super().__init__(*args) + self.mach_info = None + self.mach_curr = None - def get_current(self): - return self.current + def init(self): + super().init() - def getLifeTime(self): - return self.lifetime + self.mach_info = self._get_tango_device("mach_info") + self.mach_curr = self._get_tango_device("current") + gevent.spawn(self._refresh_ticker) - def getTopUpRemaining(self): - return self.topup_remaining + def _get_tango_device(self, property_name: str) -> DeviceProxy: + dev_name = self.get_property(property_name) + try: + return DeviceProxy(dev_name) + except Exception: + log.exception(f"error connecting to machine info tango device {dev_name}") - def getMessage(self): - return self.message + def _refresh_ticker(self): + while True: + self.update_value() + gevent.sleep(REFRESH_PERIOD_SEC) + @catch_errors + def get_current(self) -> str: + current = A_to_mA(self.mach_curr.Current) + return f"{current:.2f} mA" -def test(): - import sys + @catch_errors + def get_fillmode(self) -> str: + return self.mach_info.R3Mode - hwr = HWR.get_hardware_repository() - hwr.connect() + @catch_errors + def get_message(self) -> str: + return self.mach_info.OperatorMessage - conn = hwr.get_hardware_object(sys.argv[1]) + @catch_errors + def get_lifetime(self) -> str: + lifetime = self.mach_curr.Lifetime + if math.isnan(lifetime): + return "n/a" - print("Machine current: ", conn.get_current()) - print("Life time: ", conn.getLifeTime()) - print("TopUp remaining: ", conn.getTopUpRemaining()) - print("Message: ", conn.getMessage()) + return f"{sec_to_hour(lifetime):.2f} h" - while True: - gevent.wait(timeout=0.1) + @catch_errors + def get_injection(self) -> str: + return self.mach_info.R3NextInjection + @catch_errors + def get_status(self) -> str: + message = cleanhtml(self.mach_info.MachineMessage) + message = message.replace("R1", " R1").replace("Linac", " Linac") -if __name__ == "__main__": - test() + return message diff --git a/test/pytest/test_hwo_maxiv_mach_info.py b/test/pytest/test_hwo_maxiv_mach_info.py new file mode 100644 index 0000000000..c7203e28e1 --- /dev/null +++ b/test/pytest/test_hwo_maxiv_mach_info.py @@ -0,0 +1,115 @@ +import math +import pytest +from gevent.event import Event +from tango.server import Device, attribute, command +from tango.test_context import DeviceTestContext, MultiDeviceTestContext +from mxcubecore.HardwareObjects.MAXIV.MachInfo import MachInfo + + +class _Billboard(Device): + @attribute(name="R3Mode", dtype=str) + def r3_mode(self): + return "Delivery: Top-Up" + + @attribute(name="OperatorMessage", dtype=str) + def operator_message(self): + return "roses are blue" + + @attribute(name="R3NextInjection", dtype=str) + def r3_next_injection(self): + return "2024-06-12 14:00:00" + + @attribute(name="MachineMessage", dtype=str) + def machine_message(self): + return "R3: Shutdown
R1: Shutdown
Linac: Shutdown" + + +class _Dcct(Device): + def __init__(self, *a, **k): + super().__init__(*a, **k) + self._lifetime = 33627.44279505807 + + @attribute(name="Current", dtype=float) + def current(self): + return 0.39518965682844615 + + @attribute(name="Lifetime", dtype=float) + def lifetime(self): + return self._lifetime + + # + # special command, that is not present in a normal tango device, + # it is used by the tests to emulate the situation when there is + # no current in the storage ring and thus lifetime is 'n/a' + # + @command + def set_no_lifetime(self): + self._lifetime = math.nan + + +@pytest.fixture +def mach_info(): + # + # start our test proxy tango devices + # + devices_info = ( + { + "class": _Billboard, + "devices": [ + {"name": "test/device/billboard"}, + ], + }, + { + "class": _Dcct, + "devices": [ + { + "name": "test/device/dcct", + }, + ], + }, + ) + dev_ctx = MultiDeviceTestContext(devices_info, host="127.0.0.1", process=True) + dev_ctx.start() + + mach_info = MachInfo("/machine_info") + mach_info.set_property( + "parameters", + "['current', 'fillmode', 'message', 'lifetime', 'injection', 'status']", + ) + mach_info.set_property( + "mach_info", dev_ctx.get_device_access("test/device/billboard") + ) + mach_info.set_property("current", dev_ctx.get_device_access("test/device/dcct")) + + # listen for 'valueChanged' signal + signal_sent = Event() + mach_info.connect("valueChanged", lambda *_, **__: signal_sent.set()) + + mach_info.init() + yield mach_info + + # + # wait with tearing down the tango devices until the 'valueChanged' signal is sent, + # otherwise the emitting code can fail when it tries to read tango attributes + # + signal_sent.wait() + + # + # clean-up + # + dev_ctx.stop() + dev_ctx.join() + + +def test_read_all(mach_info: MachInfo): + assert mach_info.get_current() == "395.19 mA" + assert mach_info.get_fillmode() == "Delivery: Top-Up" + assert mach_info.get_message() == "roses are blue" + assert mach_info.get_lifetime() == "9.34 h" + assert mach_info.get_injection() == "2024-06-12 14:00:00" + assert mach_info.get_status() == "R3: Shutdown R1: Shutdown Linac: Shutdown" + + +def test_no_lifetime(mach_info: MachInfo): + mach_info.mach_curr.set_no_lifetime() + assert mach_info.get_lifetime() == "n/a"