diff --git a/.coveragerc b/.coveragerc
index 4bf2e797e8..a3b10b125f 100644
--- a/.coveragerc
+++ b/.coveragerc
@@ -7,5 +7,4 @@ omit =
mxcubecore/HardwareObjects/EMBL/*
mxcubecore/HardwareObjects/ESRF/*
mxcubecore/HardwareObjects/LNLS/*
- mxcubecore/HardwareObjects/MAXIV/*
mxcubecore/HardwareObjects/SOLEIL/*
diff --git a/mxcubecore/HardwareObjects/MAXIV/MachInfo.py b/mxcubecore/HardwareObjects/MAXIV/MachInfo.py
old mode 100644
new mode 100755
index ab3701c258..ad603aecf7
--- a/mxcubecore/HardwareObjects/MAXIV/MachInfo.py
+++ b/mxcubecore/HardwareObjects/MAXIV/MachInfo.py
@@ -1,153 +1,109 @@
-"""
-[Name] MachInfoMockup
+import re
+import math
+import logging
+import gevent
+from tango import DeviceProxy
+from mxcubecore.utils.units import sec_to_hour, A_to_mA
+from mxcubecore.HardwareObjects.abstract.AbstractMachineInfo import AbstractMachineInfo
-[Description]
-MachInfo hardware objects are used to obtain information from the accelerator
-control system.
+# how often we refresh machine info
+REFRESH_PERIOD_SEC = 30
+CLEANR = re.compile("<.*?>")
-This is a mockup hardware object, it simulates the behaviour of an accelerator
-information by :
- - produces a current value that varies with time
- - simulates a control room message that changes with some condition
- ()
- - simulates
+log = logging.getLogger("HWR")
-[Emited signals]
-machInfoChanged
- pars: values (dict)
- mandatory fields:
- values['current'] type: str; desc: synchrotron radiation current in milli-amps
- values['message'] type: str; desc: message from control room
- values['attention'] type: boolean; desc: False (if no special attention is required)
- True (if attention should be raised to the user)
+def cleanhtml(raw_html):
+ cleantext = re.sub(CLEANR, "", raw_html)
+ return cleantext
- optional fields:
- any number of optional fields can be sent over with this signal by adding them in the
- values dictionary
- for example:
- values['lifetime']
- values['topup_remaining']
-"""
+def catch_errors(func):
+ """
+ run wrapped function, catching all exception
-import logging
-import gevent
-import time
-import PyTango
-from mxcubecore import HardwareRepository as HWR
-from mxcubecore.BaseHardwareObjects import HardwareObject
+ If an exception as raised, log the exception and return 'unknown'
+ """
+ def wrapper(*a, **kw):
+ try:
+ return func(*a, **kw)
+ except Exception:
+ log.exception("error fetching machine info")
+ return "unknown"
-class MachInfo(HardwareObject):
- default_current = 0
- default_lifetime = 0
- default_message = ""
- default_topup_remaining = 0
+ return wrapper
- def __init__(self, *args):
- Equipment.__init__(self, *args)
- default_current = 0
- default_lifetime = 0
- default_message = ""
- default_topup_remaining = 0
- self.current = self.default_current
- self.lifetime = self.default_lifetime
- self.message = self.default_message
-
- self.mach_info_channel = None
- self.mach_curr_channel = None
- def init(self):
- try:
- # self.mach_info_channel = self.get_channel_object("mach_info")
- channel = self.get_property("mach_info")
- self.mach_info_channel = PyTango.DeviceProxy(channel)
- self.message = self.mach_info_channel.OperatorMessage
- self.message += "\n" + self.mach_info_channel.R3NextInjection
- except Exception as ex:
- logging.getLogger("HWR").warning("Error initializing machine info channel")
+class MachInfo(AbstractMachineInfo):
+ """
+ Machine info hardware object for MAXIV site.
- try:
- # self.curr_info_channel = self.get_channel_object("curr_info")
- channel_current = self.get_property("current")
- self.curr_info_channel = PyTango.DeviceProxy(channel_current)
- # why twice??
- # why hwr channel does not work?? why??
- if self.curr_info_channel is None:
- self.curr_info_channel = PyTango.DeviceProxy(channel_current)
- curr = self.curr_info_channel.Current
- if curr < 0:
- self.current = 0.00
- else:
- self.current = "{:.2f}".format(curr * 1000)
- self.lifetime = float(
- "{:.2f}".format(self.curr_info_channel.Lifetime / 3600)
- )
- except Exception as ex:
- logging.getLogger("HWR").warning("Error initializing current info channel")
-
- self._run()
-
- def _run(self):
- gevent.spawn(self._update_me)
-
- def _update_me(self):
- self.t0 = time.time()
+ Provides to the user general information about the machine,
+ such as ring status, current, operator message, etc.
- while True:
- gevent.sleep(2)
- self.message = self.mach_info_channel.OperatorMessage
- self.message += "\n" + self.mach_info_channel.R3NextInjection
- curr = self.curr_info_channel.Current
- if curr < 0:
- self.current = 0.00
- else:
- self.current = "{:.2f}".format(curr * 1000)
+ This hardware objects fetches the information from attributes
+ of specified tango devices.
- self.lifetime = float(
- "{:.2f}".format(self.curr_info_channel.Lifetime / 3600)
- )
+ Hardware object properties:
+ mach_info (str): name of the machine status tango device
+ current (str): name of the ring status tango device
+ parameters (str): topics to export, see AbstractMachineInfo class for details
+ """
- self.attention = False
- values = dict()
- values["current"] = self.current
- values["message"] = self.message
- values["lifetime"] = self.lifetime
- values["attention"] = self.attention
- self.emit("machInfoChanged", values)
- self.emit("valueChanged", values)
+ def __init__(self, *args):
+ super().__init__(*args)
+ self.mach_info = None
+ self.mach_curr = None
- def get_current(self):
- return self.current
+ def init(self):
+ super().init()
- def getLifeTime(self):
- return self.lifetime
+ self.mach_info = self._get_tango_device("mach_info")
+ self.mach_curr = self._get_tango_device("current")
+ gevent.spawn(self._refresh_ticker)
- def getTopUpRemaining(self):
- return self.topup_remaining
+ def _get_tango_device(self, property_name: str) -> DeviceProxy:
+ dev_name = self.get_property(property_name)
+ try:
+ return DeviceProxy(dev_name)
+ except Exception:
+ log.exception(f"error connecting to machine info tango device {dev_name}")
- def getMessage(self):
- return self.message
+ def _refresh_ticker(self):
+ while True:
+ self.update_value()
+ gevent.sleep(REFRESH_PERIOD_SEC)
+ @catch_errors
+ def get_current(self) -> str:
+ current = A_to_mA(self.mach_curr.Current)
+ return f"{current:.2f} mA"
-def test():
- import sys
+ @catch_errors
+ def get_fillmode(self) -> str:
+ return self.mach_info.R3Mode
- hwr = HWR.get_hardware_repository()
- hwr.connect()
+ @catch_errors
+ def get_message(self) -> str:
+ return self.mach_info.OperatorMessage
- conn = hwr.get_hardware_object(sys.argv[1])
+ @catch_errors
+ def get_lifetime(self) -> str:
+ lifetime = self.mach_curr.Lifetime
+ if math.isnan(lifetime):
+ return "n/a"
- print("Machine current: ", conn.get_current())
- print("Life time: ", conn.getLifeTime())
- print("TopUp remaining: ", conn.getTopUpRemaining())
- print("Message: ", conn.getMessage())
+ return f"{sec_to_hour(lifetime):.2f} h"
- while True:
- gevent.wait(timeout=0.1)
+ @catch_errors
+ def get_injection(self) -> str:
+ return self.mach_info.R3NextInjection
+ @catch_errors
+ def get_status(self) -> str:
+ message = cleanhtml(self.mach_info.MachineMessage)
+ message = message.replace("R1", " R1").replace("Linac", " Linac")
-if __name__ == "__main__":
- test()
+ return message
diff --git a/test/pytest/test_hwo_maxiv_mach_info.py b/test/pytest/test_hwo_maxiv_mach_info.py
new file mode 100644
index 0000000000..c7203e28e1
--- /dev/null
+++ b/test/pytest/test_hwo_maxiv_mach_info.py
@@ -0,0 +1,115 @@
+import math
+import pytest
+from gevent.event import Event
+from tango.server import Device, attribute, command
+from tango.test_context import DeviceTestContext, MultiDeviceTestContext
+from mxcubecore.HardwareObjects.MAXIV.MachInfo import MachInfo
+
+
+class _Billboard(Device):
+ @attribute(name="R3Mode", dtype=str)
+ def r3_mode(self):
+ return "Delivery: Top-Up"
+
+ @attribute(name="OperatorMessage", dtype=str)
+ def operator_message(self):
+ return "roses are blue"
+
+ @attribute(name="R3NextInjection", dtype=str)
+ def r3_next_injection(self):
+ return "2024-06-12 14:00:00"
+
+ @attribute(name="MachineMessage", dtype=str)
+ def machine_message(self):
+ return "R3: Shutdown
R1: Shutdown
Linac: Shutdown"
+
+
+class _Dcct(Device):
+ def __init__(self, *a, **k):
+ super().__init__(*a, **k)
+ self._lifetime = 33627.44279505807
+
+ @attribute(name="Current", dtype=float)
+ def current(self):
+ return 0.39518965682844615
+
+ @attribute(name="Lifetime", dtype=float)
+ def lifetime(self):
+ return self._lifetime
+
+ #
+ # special command, that is not present in a normal tango device,
+ # it is used by the tests to emulate the situation when there is
+ # no current in the storage ring and thus lifetime is 'n/a'
+ #
+ @command
+ def set_no_lifetime(self):
+ self._lifetime = math.nan
+
+
+@pytest.fixture
+def mach_info():
+ #
+ # start our test proxy tango devices
+ #
+ devices_info = (
+ {
+ "class": _Billboard,
+ "devices": [
+ {"name": "test/device/billboard"},
+ ],
+ },
+ {
+ "class": _Dcct,
+ "devices": [
+ {
+ "name": "test/device/dcct",
+ },
+ ],
+ },
+ )
+ dev_ctx = MultiDeviceTestContext(devices_info, host="127.0.0.1", process=True)
+ dev_ctx.start()
+
+ mach_info = MachInfo("/machine_info")
+ mach_info.set_property(
+ "parameters",
+ "['current', 'fillmode', 'message', 'lifetime', 'injection', 'status']",
+ )
+ mach_info.set_property(
+ "mach_info", dev_ctx.get_device_access("test/device/billboard")
+ )
+ mach_info.set_property("current", dev_ctx.get_device_access("test/device/dcct"))
+
+ # listen for 'valueChanged' signal
+ signal_sent = Event()
+ mach_info.connect("valueChanged", lambda *_, **__: signal_sent.set())
+
+ mach_info.init()
+ yield mach_info
+
+ #
+ # wait with tearing down the tango devices until the 'valueChanged' signal is sent,
+ # otherwise the emitting code can fail when it tries to read tango attributes
+ #
+ signal_sent.wait()
+
+ #
+ # clean-up
+ #
+ dev_ctx.stop()
+ dev_ctx.join()
+
+
+def test_read_all(mach_info: MachInfo):
+ assert mach_info.get_current() == "395.19 mA"
+ assert mach_info.get_fillmode() == "Delivery: Top-Up"
+ assert mach_info.get_message() == "roses are blue"
+ assert mach_info.get_lifetime() == "9.34 h"
+ assert mach_info.get_injection() == "2024-06-12 14:00:00"
+ assert mach_info.get_status() == "R3: Shutdown R1: Shutdown Linac: Shutdown"
+
+
+def test_no_lifetime(mach_info: MachInfo):
+ mach_info.mach_curr.set_no_lifetime()
+ assert mach_info.get_lifetime() == "n/a"