Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAXIV: update Machine Info HWO #974

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@ omit =
mxcubecore/HardwareObjects/EMBL/*
mxcubecore/HardwareObjects/ESRF/*
mxcubecore/HardwareObjects/LNLS/*
mxcubecore/HardwareObjects/MAXIV/*
mxcubecore/HardwareObjects/SOLEIL/*
206 changes: 81 additions & 125 deletions mxcubecore/HardwareObjects/MAXIV/MachInfo.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,153 +1,109 @@
"""
[Name] MachInfoMockup
import re
import math
import logging
import gevent
from tango import DeviceProxy
from mxcubecore.utils.units import sec_to_hour, A_to_mA
from mxcubecore.HardwareObjects.abstract.AbstractMachineInfo import AbstractMachineInfo

[Description]
MachInfo hardware objects are used to obtain information from the accelerator
control system.
# how often we refresh machine info
REFRESH_PERIOD_SEC = 30
CLEANR = re.compile("<.*?>")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also easily make it configurable via get_property but perhaps not desired in your case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it's not worth the hassle in this case. We have not changed this value on a very long time.


This is a mockup hardware object, it simulates the behaviour of an accelerator
information by :

- produces a current value that varies with time
- simulates a control room message that changes with some condition
()
- simulates
log = logging.getLogger("HWR")

[Emited signals]
machInfoChanged
pars: values (dict)

mandatory fields:
values['current'] type: str; desc: synchrotron radiation current in milli-amps
values['message'] type: str; desc: message from control room
values['attention'] type: boolean; desc: False (if no special attention is required)
True (if attention should be raised to the user)
def cleanhtml(raw_html):
cleantext = re.sub(CLEANR, "", raw_html)
return cleantext

optional fields:
any number of optional fields can be sent over with this signal by adding them in the
values dictionary

for example:
values['lifetime']
values['topup_remaining']
"""
def catch_errors(func):
"""
run wrapped function, catching all exception

import logging
import gevent
import time
import PyTango
from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import HardwareObject
If an exception as raised, log the exception and return 'unknown'
"""

def wrapper(*a, **kw):
try:
return func(*a, **kw)
except Exception:
log.exception("error fetching machine info")
return "unknown"

class MachInfo(HardwareObject):
default_current = 0
default_lifetime = 0
default_message = ""
default_topup_remaining = 0
return wrapper

def __init__(self, *args):
Equipment.__init__(self, *args)
default_current = 0
default_lifetime = 0
default_message = ""
default_topup_remaining = 0
self.current = self.default_current
self.lifetime = self.default_lifetime
self.message = self.default_message

self.mach_info_channel = None
self.mach_curr_channel = None

def init(self):
try:
# self.mach_info_channel = self.get_channel_object("mach_info")
channel = self.get_property("mach_info")
self.mach_info_channel = PyTango.DeviceProxy(channel)
self.message = self.mach_info_channel.OperatorMessage
self.message += "\n" + self.mach_info_channel.R3NextInjection
except Exception as ex:
logging.getLogger("HWR").warning("Error initializing machine info channel")
class MachInfo(AbstractMachineInfo):
"""
Machine info hardware object for MAXIV site.

try:
# self.curr_info_channel = self.get_channel_object("curr_info")
channel_current = self.get_property("current")
self.curr_info_channel = PyTango.DeviceProxy(channel_current)
# why twice??
# why hwr channel does not work?? why??
if self.curr_info_channel is None:
self.curr_info_channel = PyTango.DeviceProxy(channel_current)
curr = self.curr_info_channel.Current
if curr < 0:
self.current = 0.00
else:
self.current = "{:.2f}".format(curr * 1000)
self.lifetime = float(
"{:.2f}".format(self.curr_info_channel.Lifetime / 3600)
)
except Exception as ex:
logging.getLogger("HWR").warning("Error initializing current info channel")

self._run()

def _run(self):
gevent.spawn(self._update_me)

def _update_me(self):
self.t0 = time.time()
Provides to the user general information about the machine,
such as ring status, current, operator message, etc.

while True:
gevent.sleep(2)
self.message = self.mach_info_channel.OperatorMessage
self.message += "\n" + self.mach_info_channel.R3NextInjection
curr = self.curr_info_channel.Current
if curr < 0:
self.current = 0.00
else:
self.current = "{:.2f}".format(curr * 1000)
This hardware objects fetches the information from attributes
of specified tango devices.

self.lifetime = float(
"{:.2f}".format(self.curr_info_channel.Lifetime / 3600)
)
Hardware object properties:
mach_info (str): name of the machine status tango device
current (str): name of the ring status tango device
parameters (str): topics to export, see AbstractMachineInfo class for details
"""

self.attention = False
values = dict()
values["current"] = self.current
values["message"] = self.message
values["lifetime"] = self.lifetime
values["attention"] = self.attention
self.emit("machInfoChanged", values)
self.emit("valueChanged", values)
def __init__(self, *args):
super().__init__(*args)
self.mach_info = None
self.mach_curr = None

def get_current(self):
return self.current
def init(self):
super().init()

def getLifeTime(self):
return self.lifetime
self.mach_info = self._get_tango_device("mach_info")
self.mach_curr = self._get_tango_device("current")
gevent.spawn(self._refresh_ticker)

def getTopUpRemaining(self):
return self.topup_remaining
def _get_tango_device(self, property_name: str) -> DeviceProxy:
dev_name = self.get_property(property_name)
try:
return DeviceProxy(dev_name)
except Exception:
log.exception(f"error connecting to machine info tango device {dev_name}")

def getMessage(self):
return self.message
def _refresh_ticker(self):
while True:
self.update_value()
gevent.sleep(REFRESH_PERIOD_SEC)

@catch_errors
def get_current(self) -> str:
current = A_to_mA(self.mach_curr.Current)
return f"{current:.2f} mA"

def test():
import sys
@catch_errors
def get_fillmode(self) -> str:
return self.mach_info.R3Mode

hwr = HWR.get_hardware_repository()
hwr.connect()
@catch_errors
def get_message(self) -> str:
return self.mach_info.OperatorMessage

conn = hwr.get_hardware_object(sys.argv[1])
@catch_errors
def get_lifetime(self) -> str:
lifetime = self.mach_curr.Lifetime
if math.isnan(lifetime):
return "n/a"

print("Machine current: ", conn.get_current())
print("Life time: ", conn.getLifeTime())
print("TopUp remaining: ", conn.getTopUpRemaining())
print("Message: ", conn.getMessage())
return f"{sec_to_hour(lifetime):.2f} h"

while True:
gevent.wait(timeout=0.1)
@catch_errors
def get_injection(self) -> str:
return self.mach_info.R3NextInjection

@catch_errors
def get_status(self) -> str:
message = cleanhtml(self.mach_info.MachineMessage)
message = message.replace("R1", " R1").replace("Linac", " Linac")

if __name__ == "__main__":
test()
return message
115 changes: 115 additions & 0 deletions test/pytest/test_hwo_maxiv_mach_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import math
import pytest
from gevent.event import Event
from tango.server import Device, attribute, command
from tango.test_context import DeviceTestContext, MultiDeviceTestContext
from mxcubecore.HardwareObjects.MAXIV.MachInfo import MachInfo


class _Billboard(Device):
@attribute(name="R3Mode", dtype=str)
def r3_mode(self):
return "Delivery: Top-Up"

@attribute(name="OperatorMessage", dtype=str)
def operator_message(self):
return "roses are blue"

@attribute(name="R3NextInjection", dtype=str)
def r3_next_injection(self):
return "2024-06-12 14:00:00"

@attribute(name="MachineMessage", dtype=str)
def machine_message(self):
return "<b>R3:</b> Shutdown<br><b>R1:</b> Shutdown<br><b>Linac:</b> Shutdown"


class _Dcct(Device):
def __init__(self, *a, **k):
super().__init__(*a, **k)
self._lifetime = 33627.44279505807

@attribute(name="Current", dtype=float)
def current(self):
return 0.39518965682844615

@attribute(name="Lifetime", dtype=float)
def lifetime(self):
return self._lifetime

#
# special command, that is not present in a normal tango device,
# it is used by the tests to emulate the situation when there is
# no current in the storage ring and thus lifetime is 'n/a'
#
@command
def set_no_lifetime(self):
self._lifetime = math.nan


@pytest.fixture
def mach_info():
#
# start our test proxy tango devices
#
devices_info = (
{
"class": _Billboard,
"devices": [
{"name": "test/device/billboard"},
],
},
{
"class": _Dcct,
"devices": [
{
"name": "test/device/dcct",
},
],
},
)
dev_ctx = MultiDeviceTestContext(devices_info, host="127.0.0.1", process=True)
dev_ctx.start()

mach_info = MachInfo("/machine_info")
mach_info.set_property(
"parameters",
"['current', 'fillmode', 'message', 'lifetime', 'injection', 'status']",
)
mach_info.set_property(
"mach_info", dev_ctx.get_device_access("test/device/billboard")
)
mach_info.set_property("current", dev_ctx.get_device_access("test/device/dcct"))

# listen for 'valueChanged' signal
signal_sent = Event()
mach_info.connect("valueChanged", lambda *_, **__: signal_sent.set())

mach_info.init()
yield mach_info

#
# wait with tearing down the tango devices until the 'valueChanged' signal is sent,
# otherwise the emitting code can fail when it tries to read tango attributes
#
signal_sent.wait()

#
# clean-up
#
dev_ctx.stop()
dev_ctx.join()


def test_read_all(mach_info: MachInfo):
assert mach_info.get_current() == "395.19 mA"
assert mach_info.get_fillmode() == "Delivery: Top-Up"
assert mach_info.get_message() == "roses are blue"
assert mach_info.get_lifetime() == "9.34 h"
assert mach_info.get_injection() == "2024-06-12 14:00:00"
assert mach_info.get_status() == "R3: Shutdown R1: Shutdown Linac: Shutdown"


def test_no_lifetime(mach_info: MachInfo):
mach_info.mach_curr.set_no_lifetime()
assert mach_info.get_lifetime() == "n/a"
Loading