diff --git a/qcodes_contrib_drivers/drivers/Thorlabs/K10CR1.py b/qcodes_contrib_drivers/drivers/Thorlabs/K10CR1.py index 5f8ad701d..7d628670a 100644 --- a/qcodes_contrib_drivers/drivers/Thorlabs/K10CR1.py +++ b/qcodes_contrib_drivers/drivers/Thorlabs/K10CR1.py @@ -4,7 +4,7 @@ import qcodes.utils.validators as vals from qcodes import Instrument -from .APT import Thorlabs_APT, ThorlabsHWType +from .private.APT import Thorlabs_APT, ThorlabsHWType class RotationDirection(enum.Enum): diff --git a/qcodes_contrib_drivers/drivers/Thorlabs/KDC101.py b/qcodes_contrib_drivers/drivers/Thorlabs/KDC101.py new file mode 100644 index 000000000..91dedf4e6 --- /dev/null +++ b/qcodes_contrib_drivers/drivers/Thorlabs/KDC101.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +"""QCoDes-Driver for Thorlab KDC101 K-Cube Brushed DC Servo Motor Controller +https://www.thorlabs.com/thorproduct.cfm?partnumber=KDC101 + +Authors: + Julien Barrier, +""" +import logging +from typing import Optional + +from .private.CC import _Thorlabs_CC + +log = logging.getLogger(__name__) + +class Thorlabs_KDC101(_Thorlabs_CC): + """Instrument driver for the Thorlabs KDC101 servo motor controller + + Args: + name: Instrument name. + serial_number: Serial number of the device. + dll_path: Path to the kinesis dll for the instrument to use. + dll_dir: Directory in which the kinesis dll are stored. + simulation: Enables the simulation manager. + polling: Polling rate in ms. + home: Sets the device to home state. + """ + def __init__(self, + name: str, + serial_number: str, + dll_path: Optional[str] = None, + dll_dir: Optional[str] = None, + simulation: bool = False, + polling: int = 200, + home: bool = False, + **kwargs): + if dll_path: + self._dll_path = dll_path + else: + self._dll_path = 'Thorlabs.MotionControl.KCube.DCServo.dll' + self._dll_dir: Optional[str] = dll_dir if dll_dir else None + super().__init__(name, serial_number, self._dll_path, self._dll_dir, + simulation, polling, home, **kwargs) diff --git a/qcodes_contrib_drivers/drivers/Thorlabs/KLS1550.py b/qcodes_contrib_drivers/drivers/Thorlabs/KLS1550.py index 532eff9f1..2c42de72a 100644 --- a/qcodes_contrib_drivers/drivers/Thorlabs/KLS1550.py +++ b/qcodes_contrib_drivers/drivers/Thorlabs/KLS1550.py @@ -1,81 +1,39 @@ -from qcodes import Instrument -import qcodes.utils.validators as vals - -from .Kinesis import Thorlabs_Kinesis - - -class Thorlabs_KLS1550(Instrument): - def __init__( - self, - name: str, - serial_number: str, - polling_speed_ms: int, - kinesis: Thorlabs_Kinesis, - **kwargs, - ): - super().__init__(name, **kwargs) - # Save Kinesis server reference - self.kinesis = kinesis - - # Initialization - self.serial_number = serial_number - self.polling_speed_ms = polling_speed_ms - self.kinesis.open_laser(self.serial_number) - self.kinesis.start_polling(self.serial_number, self.polling_speed_ms) - self.info = self.kinesis.laser_info(self.serial_number) - self.model = self.info[0].value.decode("utf-8") - self.version = self.info[4].value - - # Parameters - self.add_parameter( - "output_enabled", - get_cmd=self._get_output_enabled, - set_cmd=self._set_output_enabled, - vals=vals.Bool(), - unit="", - label="Laser output on/off", - docstring="Turn laser output on/off. Note that laser key switch must be on to turn laser output on.", - ) - - self.add_parameter( - "power", - get_cmd=self._get_power, - set_cmd=self._set_power, - vals=vals.Numbers(0, 0.007), # [ATTENTION] max power for simulator is 10mW - unit="W", - label="Power output", - ) - - self.connect_message() - - def get_idn(self): - return { - "vendor": "Thorlabs", - "model": self.model, - "firmware": self.version, - "serial": self.serial_number, - } - - def _get_output_enabled(self): - # First status bit represents 'output enabled' - return bool(self.kinesis.laser_status_bits(self.serial_number) & 1) - - def _set_output_enabled(self, value: bool): - if value: - self.kinesis.laser_enable_output(self.serial_number) +# -*- coding: utf-8 -*- +"""QCoDeS-Driver for Thorlab KLS1550 Laser source + +Authors: + iago-rst https://github.com/iago-rst, 2023 + Julien Barrier , 2023 +""" +import logging +from typing import Optional +from .private.LS import _Thorlabs_LS + +log = logging.getLogger(__name__) + +class Thorlabs_KLS1550(_Thorlabs_LS): + """Instrument driver for the Thorlabs KLS1550 + + Args: + name: Instrument name. + serial_number: Serial number of the device. + dll_path: Path to the kinesis dll for the instrument to use. + dll_dir: Directory in which the kinesis dll are stored. + simulation: Enables the simulation manager. + polling: Polling rate in ms. + """ + def __init__(self, + name: str, + serial_number: str, + dll_path: Optional[str] = None, + dll_dir: Optional[str] = None, + simulation: bool = False, + polling: int = 200, + **kwargs): + if dll_path: + self._dll_path = dll_path else: - self.kinesis.laser_disable_output(self.serial_number) - - def _get_power(self): - return self.kinesis.get_laser_power(self.serial_number) - - def _set_power(self, power_W: float): - self.kinesis.set_laser_power(self.serial_number, power_W) - - def disconnect(self): - self.kinesis.stop_polling(self.serial_number) - self.kinesis.close_laser(self.serial_number) - - def close(self): - self.disconnect() - super().close() + self._dll_path = 'Thorlabs.MotionControl.KCube.LaserSource.dll' + self._dll_dir: Optional[str] = dll_dir if dll_dir else None + super().__init__(name, serial_number, self._dll_path, self._dll_dir, + simulation, polling, **kwargs) diff --git a/qcodes_contrib_drivers/drivers/Thorlabs/Kinesis.py b/qcodes_contrib_drivers/drivers/Thorlabs/Kinesis.py deleted file mode 100644 index d74e8c9c2..000000000 --- a/qcodes_contrib_drivers/drivers/Thorlabs/Kinesis.py +++ /dev/null @@ -1,132 +0,0 @@ -import os -import sys -import ctypes -from ctypes import wintypes - - -class Thorlabs_KinesisException(Exception): - pass - - -class Thorlabs_Kinesis(): - - def __init__(self, dll_path: str, sim: bool) -> None: - if sys.platform == "win32": - os.add_dll_directory(r"C:\Program Files\Thorlabs\Kinesis") - self.dll_path = dll_path - self.dll = ctypes.cdll.LoadLibrary(self.dll_path) - - if sim: - self.enable_simulation() - - # See Thorlabs Kinesis C API documentation for error description - def error_check(self, r: int, source:str) -> int: - if r != 0: - raise Thorlabs_KinesisException("Kinesis: [{}]: Error code {}".format(source, r)) - - return r - - # Connects to simulator (which must be already running but - # without GUI open, only one port) - def enable_simulation(self) -> None: - r = self.dll.TLI_InitializeSimulations() - # Throws error message even if simulation was successfull, - # check the log in the Kinesis simulator for confirmation instead - # self.error_check(r, 'Enable simulation') - - def disable_simulation(self) -> None: - r = self.dll.TLI_UninitializeSimulations() - self.error_check(r, 'Disable simulation') - - # Returns list with all the available (and closed?) devices - def device_list(self) -> list: - r = self.dll.TLI_BuildDeviceList() - if r == 0 or r == 16: - device_list = ctypes.create_string_buffer(250) - r2 = self.dll.TLI_GetDeviceListExt(ctypes.byref(device_list), 250) - self.error_check(r2, 'Get device list') - else: - self.error_check(r, 'Build device list') - - return device_list.value.decode("utf-8").rstrip(',').split(",") - - # Helper convertion function - def to_char_p(self, s: str) -> ctypes.c_char_p: - return ctypes.c_char_p(s.encode('ascii')) - - # Functions for starting and stopping connection to the laser source - def open_laser(self, serial_number: str) -> None: - r = self.dll.LS_Open(self.to_char_p(serial_number)) - self.error_check(r, 'Opening device') - - def start_polling(self, serial_number: str, polling_speed: int) -> None: - # Note: this function returns a boolean (for some reason) - r = self.dll.LS_StartPolling(self.to_char_p(serial_number), ctypes.c_int(polling_speed)) - if r != 1: - # 21 = FT_SpecificFunctionFail - The function failed to complete succesfully. - # If unsuccessful, then r seems to be -1749177856 but this is not explained in the documentation - self.error_check(21, 'Start polling') - - def close_laser(self, serial_number: str) -> None: - self.dll.LS_Close(self.to_char_p(serial_number)) - - def stop_polling(self, serial_number: str) -> None: - self.dll.LS_StopPolling(self.to_char_p(serial_number)) - - # Gets device information from a serial number - def laser_info(self, serial_number: str) -> list: - # Defining variables for information storing - model = ctypes.create_string_buffer(8) - model_size = ctypes.wintypes.DWORD(8) - type_num = ctypes.wintypes.WORD() - channel_num = ctypes.wintypes.WORD() - notes = ctypes.create_string_buffer(48) - notes_size = ctypes.c_ulong(48) - firmware_version = ctypes.wintypes.DWORD() - hardwware_version = ctypes.wintypes.WORD() - modification_state = ctypes.wintypes.WORD() - - r = self.dll.LS_GetHardwareInfo(self.to_char_p(serial_number), ctypes.byref(model), model_size, - ctypes.byref(type_num), ctypes.byref(channel_num), - ctypes.byref(notes), notes_size, ctypes.byref(firmware_version), - ctypes.byref(hardwware_version), ctypes.byref(modification_state)) - self.error_check(r, 'Get hardware info') - - return [model, type_num, channel_num, notes, firmware_version, hardwware_version, modification_state] - - # Returns a string with the status in binary (see doc. for bit meaning) - def laser_status_bits(self, serial_number: str) -> int: - # Note: status bits updated at polling interval, hence no LS_RequestStatusBits - integer = self.dll.LS_GetStatusBits(self.to_char_p(serial_number)) - if integer == 0x40000000: - return self.error_check(integer, 'Get status bits') - else: - return integer - - # Turnng the laser on/off (on only if safety key is on) - def laser_enable_output(self, serial_number: str) -> None: - r = self.dll.LS_EnableOutput(self.to_char_p(serial_number)) - self.error_check(r, 'Enable laser output') - - def laser_disable_output(self, serial_number: str) -> None: - r = self.dll.LS_DisableOutput(self.to_char_p(serial_number)) - self.error_check(r, 'Disable laser output') - - # Reading laser power - def get_laser_power(self, serial_number: str) -> float: - max_num = 32767 - max_power_W = 0.007 - num = self.dll.LS_GetPowerReading(self.to_char_p(serial_number)) - - return num/max_num * max_power_W - - # [ATTENTION] Setting laser power - def set_laser_power(self, serial_number: str, power_W: float) -> None: - # Maximum integer level for laser power - max_num = 32767 - max_power_W = 0.007 - # [ATTENTION] Maximum power is 7mW for actual device but 10mW for the simulator (somehow) - percentage = power_W/max_power_W - r = self.dll.LS_SetPower(self.to_char_p(serial_number), int(percentage*max_num)) - self.error_check(r, 'Set laser power') - diff --git a/qcodes_contrib_drivers/drivers/Thorlabs/MFF10x.py b/qcodes_contrib_drivers/drivers/Thorlabs/MFF10x.py index 3cfb268d0..7a9dd1530 100644 --- a/qcodes_contrib_drivers/drivers/Thorlabs/MFF10x.py +++ b/qcodes_contrib_drivers/drivers/Thorlabs/MFF10x.py @@ -1,5 +1,5 @@ from qcodes import Instrument -from .APT import Thorlabs_APT, ThorlabsHWType +from .private.APT import Thorlabs_APT, ThorlabsHWType class Thorlabs_MFF10x(Instrument): diff --git a/qcodes_contrib_drivers/drivers/Thorlabs/PM100D.py b/qcodes_contrib_drivers/drivers/Thorlabs/PM100D.py new file mode 100644 index 000000000..f8f2cdb1a --- /dev/null +++ b/qcodes_contrib_drivers/drivers/Thorlabs/PM100D.py @@ -0,0 +1,189 @@ +# -*- coding: utf-8 -*- +"""QCoDes-Driver for Thorlab PM100D Handheld Optical Power and Energy Meter Console +https://www.thorlabs.com/newgrouppage9.cfm?objectgroup_id=3341&pn=PM100D + +Authors: + Julien Barrier, +""" + +import logging +from time import sleep +from typing import Any + +from qcodes.instrument import VisaInstrument +from qcodes.parameters import Parameter, create_on_off_val_mapping +import qcodes.validators as vals + +log = logging.getLogger(__name__) + + +class Thorlab_PM100D(VisaInstrument): + """ + Class to represent a Thorlab PM100D optical powermeter + + status: beta-version + + Args: + name: name for the instrument + address: Visa Resource name to connect + terminator: Visa terminator + timeout. Visa timeout. + """ + def __init__(self, + name: str, + address: str, + terminator: str = '\n', + timeout: float = 20, + **kwargs: Any): + super().__init__(name, address, terminator=terminator, + timeout=timeout, **kwargs) + + self.averaging = Parameter( + 'averaging', + label='Averaging rate', + get_cmd='AVER?', + set_cmd='AVER', + vals=vals.Numbers(), + instrument=self + ) + + self.wavelength = Parameter( + 'wavelength', + label='Detected wavelength', + unit='m', + get_cmd='SENS:CORR:WAV?', + set_cmd='SENS:CORR:WAV {}', + scale=1e9, + vals=vals.Numbers(185e-9, 25e-6), + instrument=self + ) + + self.power = Parameter( + 'power', + label='Measured power', + unit='W', + get_cmd=self._get_power, + vals=vals.Numbers(), + instrument=self + ) + + self.attenuation = Parameter( + 'attenuation', + label='Attenuation', + unit='dB', + get_cmd='CORR?', + get_parser=float, + set_cmd='CORR {}', + set_parser=float, + vals=vals.Numbers(-60, 60), + instrument=self + ) + + self.power_range = Parameter( + 'power_range', + label='Power range', + unit='W', + get_cmd='SENS:POW:RANG:UPP?', + set_cmd='SENS:POW:RANG:UPP {}', + get_parser=float, + set_parser=float, + vals=vals.Numbers(), + instrument=self + ) + + self.auto_range = Parameter( + 'auto_range', + label='Auto range power', + get_cmd='SENS:POW:RANG:AUTO?', + set_cmd='SENS:POW:RANG:AUTO {}', + val_mapping=create_on_off_val_mapping(on_val='1', off_val='0'), + vals=vals.Ints(0, 1), + instrument=self + ) + + self.frequency = Parameter( + 'frequency', + unit='Hz', + get_cmd='MEAS:FREQ?', + get_parser=float, + vals=vals.Numbers(), + instrument=self + ) + + self.current = Parameter( + 'current', + label='Current', + unit='A', + get_cmd='MEAS:CURR?', + get_parser=float, + vals=vals.Numbers(), + instrument=self + ) + + self.current_range = Parameter( + 'current_range', + label='Current range', + unit='A', + get_cmd='SENS:CURR:RANG:UPP?', + get_parser=float, + vals=vals.Numbers(), + instrument=self + ) + + self.zero_value = Parameter( + 'zero_value', + unit='W', + get_cmd='CORR:COLL:ZERO:MAGN?', + get_parser=float, + vals=vals.Numbers(), + instrument=self + ) + + self.beam_diameter = Parameter( + 'beam_diameter', + label='Beam diameter', + unit='m', + get_cmd='CORR:BEAM?', + set_cmd='CORR:BEAM {}', + scale=1e3, + vals=vals.Numbers(), + instrument=self + ) + + self._set_transition_filter(512, 0) + self.averaging(300) + self._set_conf_power() + + self.connect_message() + + def _check_error(self) -> None: + err = self.ask('SYST:ERR?') + if err[:2] != '+0': + raise RuntimeError(f'PM100D call failed with error: {err}') + + def _set_conf_power(self) -> None: + """Set configuration to power mode + """ + self.write('CONF:POW') # set config to power mode + self.ask('ABOR;:STAT:OPER?') + self.write('INIT') + return None + + def _get_power(self) -> float: + """Get the power + """ + self._set_conf_power() + self.write('MEAS:POW') + sleep(.2) + power = self.ask('FETC?') + return float(power) + + def _set_transition_filter(self, positive: int, negative: int) -> None: + """Apply filters + """ + self.write(f'STAT:OPER:PTR {positive}') + sleep(.2) + self.write(f'STAT:OPER:NTR {negative}') + sleep(5) + self.ask('STAT:OPER?') # clear register + sleep(.2) diff --git a/qcodes_contrib_drivers/drivers/Thorlabs/PRM1Z8.py b/qcodes_contrib_drivers/drivers/Thorlabs/PRM1Z8.py index f856233ae..05fdd1d4d 100644 --- a/qcodes_contrib_drivers/drivers/Thorlabs/PRM1Z8.py +++ b/qcodes_contrib_drivers/drivers/Thorlabs/PRM1Z8.py @@ -1,5 +1,5 @@ from qcodes import Instrument -from .APT import Thorlabs_APT, ThorlabsHWType +from .private.APT import Thorlabs_APT, ThorlabsHWType class Thorlabs_PRM1Z8(Instrument): diff --git a/qcodes_contrib_drivers/drivers/Thorlabs/TDC001.py b/qcodes_contrib_drivers/drivers/Thorlabs/TDC001.py new file mode 100644 index 000000000..dcba4a894 --- /dev/null +++ b/qcodes_contrib_drivers/drivers/Thorlabs/TDC001.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +"""QCoDes-Driver for Thorlab TDC001 T-Cube Brushed DC Servo Motor Controller +https://www.thorlabs.com/thorproduct.cfm?partnumber=TDC001 + +Authors: + Julien Barrier, +""" +import logging +from typing import Optional + +from .private.CC import _Thorlabs_CC + +log = logging.getLogger(__name__) + +class Thorlabs_TDC001(_Thorlabs_CC): + """Instrument driver for the Thorlabs TDC001 servo motor controller + + Args: + name: Instrument name. + serial_number: Serial number of the device. + dll_path: Path to the kinesis dll for the instrument to use. + dll_dir: Directory in which the kinesis dll are stored. + simulation: Enables the simulation manager. Defaults to False. + polling: Polling rate in ms. Defaults to 200. + home: Sets the device to home state. Defaults to False. + """ + _CONDITIONS = ['homed', 'moved', 'stopped', 'limit_updated'] + def __init__(self, + name: str, + serial_number: str, + dll_path: Optional[str] = None, + dll_dir: Optional[str] = None, + simulation: bool = False, + polling: int = 200, + home: bool = False, + **kwargs): + if dll_path: + self._dll_path = dll_path + else: + self._dll_path = 'Thorlabs.MotionControl.TCube.DCServo.dll' + self._dll_dir: Optional[str] = dll_dir if dll_dir else None + super().__init__(name, serial_number, self._dll_path, self._dll_dir, + simulation, polling, home, **kwargs) diff --git a/qcodes_contrib_drivers/drivers/Thorlabs/APT.py b/qcodes_contrib_drivers/drivers/Thorlabs/private/APT.py similarity index 94% rename from qcodes_contrib_drivers/drivers/Thorlabs/APT.py rename to qcodes_contrib_drivers/drivers/Thorlabs/private/APT.py index cd027dfed..f83f6f9da 100644 --- a/qcodes_contrib_drivers/drivers/Thorlabs/APT.py +++ b/qcodes_contrib_drivers/drivers/Thorlabs/private/APT.py @@ -5,8 +5,23 @@ class ThorlabsHWType(enum.Enum): PRM1Z8 = 31 - MFF10x = 48 - K10CR1 = 50 + BSC001 = 11 # 1 ch benchtop stepper driver + BSC101 = 12 # 1 ch benchtop stepper driver + BSC002 = 13 # 2 ch benchtop stepper driver + BDC101 = 14 # 1 ch benchtop dc servo driver + SCC001 = 21 # 1 ch stepper driver card (used within BSC102, 103 units) + DCC001 = 22 # 1 ch DC servo driver card (used within BDC102, 103 units) + ODC001 = 24 # 1 ch DC servo driver cube + OST001 = 25 # 1 ch stepper driver cube + MST601 = 26 # 2 ch modular stepper driver module + TST001 = 29 # 1 ch stepper driver T-cube + TDC001 = 31 # 1 ch DC servo driver T-cube + LTSxxx = 42 # LTS300/LTS150 long travel integrated driver/stages + L490MZ = 43 # L490MZ Integrated Driver/Labjack + BBD10x = 44 # 1/2/3 ch benchtop brushless DC servo driver + MFF10x = 48 # motorized filter flip + K10CR1 = 50 # steper motor rotation mount + KDC101 = 63 # 1 ch Brushed DC servo motor controller K-cube class ThorlabsException(Exception): diff --git a/qcodes_contrib_drivers/drivers/Thorlabs/private/CC.py b/qcodes_contrib_drivers/drivers/Thorlabs/private/CC.py new file mode 100644 index 000000000..a134d4f53 --- /dev/null +++ b/qcodes_contrib_drivers/drivers/Thorlabs/private/CC.py @@ -0,0 +1,720 @@ +# -*- coding: utf-8 -*- +"""QCoDes- Base driver for Thorlab instruments using the CC commands + +Authors: + Julien Barrier, +""" +import ctypes +import logging +from time import sleep, time +from typing import Optional + +from qcodes.parameters import Parameter +from qcodes import validators as vals + +from .kinesis import _Thorlabs_Kinesis + +log = logging.getLogger(__name__) + +class _Thorlabs_CC(_Thorlabs_Kinesis): + """Instrument driver for Thorlabs instruments using the CC commands + + Args: + name: Instrument name. + serial_number: Serial number of the device. + dll_path: Path to the kinesis dll for the instrument to use. + dll_dir: Directory in which the kinesis dll are stored. + simulation: Enables the simulation manager. Defaults to False. + polling: Polling rate in ms. Defaults to 200. + home: Sets the device to home state. Defaults to False. + """ + _CONDITIONS = ['homed', 'moved', 'stopped', 'limit_updated'] + def __init__(self, + name: str, + serial_number: str, + dll_path: str, + dll_dir: Optional[str], + simulation: bool = False, + polling: int = 200, + home: bool = False, + **kwargs): + self._dll_path = dll_path + super().__init__(name, serial_number, + self._dll_path, dll_dir, simulation, + **kwargs) + + if self._dll.TLI_BuildDeviceList() == 0: + self._dll.CC_Open(self._serial_number) + self._start_polling(polling) + + self._info = self._get_hardware_info() + self.model = self._info[0].decode('utf-8') + self.version = self._info[4] + + self._load_settings() + self._set_limits_approach(1) + + sleep(3) + + self._clear_message_queue() + + if home: + if not self._can_home(): + self.homed = False + raise RuntimeError('Device `{}` is not homeable') + else: + self.go_home() + else: + self.homed = False + + + self.position = Parameter( + 'position', + label='Position', + get_cmd=self._get_position, + set_cmd=self._set_position, + vals=vals.Numbers(), + unit='\u00b0', + instrument=self + ) + + self.max_position = Parameter( + 'max_position', + label='Maximum position', + set_cmd=self._set_max_position, + get_cmd=self._get_max_position, + vals=vals.Numbers(0, 360), + instrument=self + ) + + self.min_position = Parameter( + 'min_position', + label='Minimum position', + set_cmd=self._set_min_position, + get_cmd=self._get_min_position, + vals=vals.Numbers(0, 360), + instrument=self + ) + + self.velocity = Parameter( + 'velocity', + label='Move velocity', + unit='\u00b0/s', + set_cmd=self._set_move_velocity, + get_cmd=self._get_move_velocity, + vals=vals.Numbers(0, 25), + instrument=self + ) + + self.jog_velocity = Parameter( + 'jog_velocity', + label='Jog velocity', + unit='\u00b0/s', + set_cmd=self._set_jog_velocity, + get_cmd=self._get_jog_velocity, + vals=vals.Numbers(0, 25), + instrument=self + ) + + self.homing_velocity = Parameter( + 'homing_velocity', + label='Homing velocity', + unit='\u00b0/s', + set_cmd=self._set_homing_velocity, + get_cmd=self._get_homing_velocity, + vals=vals.Numbers(0.1, 25), + instrument=self + ) + + self.acceleration = Parameter( + 'acceleration', + label='Move acceleration', + unit='\u00b0/s\u00b2', + set_cmd=self._set_move_acceleration, + get_cmd=self._get_move_acceleration, + vals=vals.Numbers(0, 25), + instrument=self + ) + + self.jog_acceleration = Parameter( + 'jog_acceleration', + label='Jog acceleration', + unit='\u00b0/s\u00b2', + set_cmd=self._set_jog_acceleration, + get_cmd=self._get_jog_acceleration, + vals=vals.Numbers(0, 25), + instrument=self + ) + + self.jog_mode = Parameter( + 'jog_mode', + set_cmd=self._set_jog_mode, + get_cmd=self._get_jog_mode, + vals=vals.Enum('continuous', 'stepped'), + instrument=self + ) + + self.jog_step_size = Parameter( + 'jog_step_size', + set_cmd=self._set_jog_step_size, + get_cmd=self._get_jog_step_size, + vals=vals.Numbers(0.0005, 360), + instrument=self + ) + + self.stop_mode = Parameter( + 'stop_mode', + set_cmd=self._set_stop_mode, + get_cmd=self._get_stop_mode, + vals=vals.Enum('immediate', 'profiled'), + instrument=self + ) + + self.soft_limits_mode = Parameter( + 'soft_limits_mode', + set_cmd=self._set_soft_limits_mode, + get_cmd=self._get_soft_limits_mode, + vals=vals.Enum('disallow', 'partial', 'all'), + instrument=self + ) + + self.backlash = Parameter( + 'backlash', + unit='\u00b0', + set_cmd=self._set_backlash, + get_cmd=self._get_backlash, + vals=vals.Numbers(0, 5), + instrument=self + ) + + self.connect_message() + + def identify(self) -> None: + """Sends a command to the device to make it identify itself""" + self.log.debug('identify the device') + self._dll.CC_Identify(self._serial_number) + + def get_idn(self) -> dict: + """Get device identifier""" + idparts = ['Thorlabs', self.model, self.version, self.serial_number] + return dict(zip(('vendor', 'model', 'serial', 'firmware'), idparts)) + + def go_home(self, block=True): + """Home the device: set the device to a known state and home position + + Args: + block: will wait for completion. Defaults to True. + """ + self.log.info('home the device.') + self._check_error(self._dll.CC_Home(self._serial_number)) + self.homed = True + if block: + self.wait_for_completion() + + def wait_for_completion(self, status: str = 'homed', max_time: float = 5) -> None: + """Wait for the current function to be finished. + + Args: + status: expected status. Defaults to 'homed'. + max_time: maximum waiting time for the internal loop. + """ + self.log.debug('wait for the current function to be completed') + message_type = ctypes.c_ushort() + message_id = ctypes.c_ushort() + message_data = ctypes.c_ulong() + cond = self._CONDITIONS.index(status) + + if status == 'stopped': + if not self.is_moving(): + return None + elif status == 'homed': + max_time = 0 + + while self._dll.CC_MessageQueueSize(self._serial_number) <= 0: + sleep(.2) + + self._dll.CC_WaitForMessage( + self._serial_number, ctypes.byref(message_type), + ctypes.byref(message_id), ctypes.byref(message_data) + ) + + start = time() + while int(message_type.value) != 2 or int(message_id.value) != cond: + end = time() + if end - start > max_time and max_time != 0: + raise RuntimeError(f'waited for {max_time} for {status} to complete' + f'message type: {message_type.value} ({message_id.value})') + + if self._dll.CC_MessageQueueSize(self._serial_number) <= 0: + sleep(.2) + continue + self._dll.CC_WaitForMessage( + self._serial_number, + ctypes.byref(message_type), + ctypes.byref(message_id), + ctypes.byref(message_data) + ) + return None + + def _device_unit_to_real(self, device_unit: int, unit_type: int) -> float: + """Converts a device unit to a real world unit + + Args: + device_unit: the device unit. + unit_type: the type of unit. Distance: 0, velocity: 1, acceleration: 2. + + Returns: + float: real unit value + """ + real_unit = ctypes.c_double() + ret = self._dll.CC_GetRealValueFromDeviceUnit( + self._serial_number, ctypes.c_int(device_unit), + ctypes.byref(real_unit), ctypes.c_int(unit_type) + ) + self._check_error(ret) + return real_unit.value + + def _real_to_device_unit(self, real_unit: float, unit_type: int) -> int: + """Converts a real world unit to a device unit + + Args: + real_unit: the real unit + unit_type: the type of unit. Distance: 0, velocity: 1, acceleration: 2 + + Returns: + int: device unit + """ + device_unit = ctypes.c_int() + ret = self._dll.CC_GetDeviceUnitFromRealValue( + self._serial_number, ctypes.c_double(real_unit), + ctypes.byref(device_unit), ctypes.c_int(unit_type) + ) + self._check_error(ret) + return device_unit.value + + def _get_backlash(self) -> float: + """Get the backlash distance setting (used to control hysteresis)""" + ret = self._dll.CC_GetBacklash(self._serial_number) + return self._device_unit_to_real(ret, 0) + + def _set_backlash(self, value: float) -> None: + """Set the backlash distance setting (used to control hysteresis)""" + val = self._real_to_device_unit(value, 0) + ret = self._dll.CC_SetBacklash(self._serial_number, ctypes.c_long(val)) + self._check_error(ret) + + def _get_homing_velocity(self) -> float: + vel = self._dll.CC_GetHomingVelocity(self._serial_number) + return self._device_unit_to_real(vel, 1) + + def _set_homing_velocity(self, velocity: float) -> None: + vel = self._real_to_device_unit(velocity, 1) + ret = self._dll.CC_SetHomingVelocity(self._serial_number, + ctypes.c_uint(vel)) + self._check_error(ret) + + def _get_jog_mode(self) -> str: + jog_mode = ctypes.c_short() + stop_mode = ctypes.c_short() + ret = self._dll.CC_GetJogMode(self._serial_number, + ctypes.byref(jog_mode), + ctypes.byref(stop_mode)) + self._check_error(ret) + if jog_mode.value == ctypes.c_short(0x01).value: + return 'continuous' + elif jog_mode.value == ctypes.c_short(0x02).value: + return 'stepped' + else: + raise RuntimeError('Unexpected value received from Kinesis') + + def _set_jog_mode(self, mode: str) -> None: + jog_mode = ctypes.c_short(0x00) + stop_mode = ctypes.c_short(0x00) + stop_mode = self._get_stop_mode() + + if mode == 'continuous': + jog_mode = ctypes.c_short(0x01) + elif mode == 'stepped': + jog_mode = ctypes.c_short(0x02) + if stop_mode == 'immediate': + stop_mode = ctypes.c_short(0x01) + elif stop_mode == 'profiled': + stop_mode = ctypes.c_short(0x02) + + ret = self._dll.CC_SetJogMode(self._serial_number, + jog_mode, stop_mode) + self._check_error(ret) + return None + + def _get_jog_step_size(self) -> float: + ret = self._dll.CC_GetJogStepSize(self._serial_number) + return self._device_unit_to_real(ret, 0) + + def _set_jog_step_size(self, step_size: float) -> None: + step = self._real_to_device_unit(step_size, 0) + ret = self._dll.CC_SetJogStepSize(self._serial_number, + ctypes.c_uint(step)) + self._check_error(ret) + return None + + def _check_connection(self) -> bool: + ret = self._dll.CC_CheckConnection(self._serial_number) + return bool(ret) + + def _get_stop_mode(self): + jog_mode = ctypes.c_short() + stop_mode = ctypes.c_short() + ret = self._dll.CC_GetJogMode(self._serial_number, + ctypes.byref(jog_mode), + ctypes.byref(stop_mode)) + self._check_error(ret) + if stop_mode.value == ctypes.c_short(0x01).value: + return 'immediate' + elif stop_mode.value == ctypes.c_short(0x02).value: + return 'profiled' + else: + raise RuntimeError('unexpected value received from Kinesis') + + def _set_stop_mode(self, mode: str) -> None: + jog_mode = ctypes.c_short(0x00) + stop_mode = ctypes.c_short(0x00) + + jmode = self._get_jog_mode() + if jmode == 'continuous': + jog_mode = ctypes.c_short(0x01) + elif jmode == 'stepped': + jog_mode = ctypes.c_short(0x02) + if mode == 'immediate': + stop_mode = ctypes.c_short(0x01) + elif mode == 'profiled': + stop_mode = ctypes.c_short(0x02) + + ret = self._dll.CC_SetJogMode(self._serial_number, + jog_mode, stop_mode) + self._check_error(ret) + return None + + def _get_max_position(self) -> float: + max_pos = ctypes.c_int() + self._dll.CC_GetStageAxisMaxPos(self._serial_number, max_pos) + return self._device_unit_to_real(max_pos.value, 0) + + def _set_max_position(self, max_val: float) -> None: + min_val = self._get_min_position() + min_val = self._real_to_device_unit(min_val, 0) + max_val = self._real_to_device_unit(max_val, 0) + ret = self._dll.CC_SetStageAxisLimits(self._serial_number, + ctypes.c_int(min_val), + ctypes.c_int(max_val)) + self._check_error(ret) + self.wait_for_completion('limit_updated') + + def _get_min_position(self) -> float: + min_pos = ctypes.c_int() + self._dll.CC_GetStageAxisMinPos(self._serial_number, min_pos) + return self._device_unit_to_real(min_pos.value, 0) + + def _set_min_position(self, min_val: float) -> None: + max_val = self._get_max_position() + max_val = self._real_to_device_unit(max_val, 0) + min_val = self._real_to_device_unit(min_val, 0) + ret = self._dll.CC_SetStageAxisLimits(self._serial_number, + ctypes.c_int(min_val), + ctypes.c_int(max_val)) + self._check_error(ret) + self.wait_for_completion('limit_updated') + + def _get_soft_limits_mode(self) -> str: + """Gets the software limits mode.""" + mode = ctypes.c_int16() + self._dll.CC_GetSoftLimitMode(self._serial_number, ctypes.byref(mode)) + if mode.value == ctypes.c_int16(0).value: + return 'disallow' + elif mode.value == ctypes.c_int16(1).value: + return 'partial' + elif mode.value == ctypes.c_int16(2).value: + return 'all' + else: + raise RuntimeError('unexpected value received from Kinesis') + + def _set_soft_limits_mode(self, mode: str) -> None: + """Sets the software limits mode""" + if mode == 'disallow': + lmode = ctypes.c_int16(0) + elif mode == 'partial': + lmode = ctypes.c_int16(1) + elif mode == 'all': + lmode = ctypes.c_int16(2) + + ret = self._dll.CC_SetLimitsSoftwareApproachPolicy(self._serial_number, + lmode) + self._check_error(ret) + + def _get_move_velocity(self) -> float: + acceleration = ctypes.c_int() + velocity = ctypes.c_int() + ret = self._dll.CC_GetVelParams(self._serial_number, + ctypes.byref(acceleration), + ctypes.byref(velocity)) + self._check_error(ret) + return self._device_unit_to_real(velocity.value, 1) + + def _set_move_velocity(self, velocity: float) -> None: + vel = self._real_to_device_unit(velocity, 1) + accel = self._real_to_device_unit(self._get_move_acceleration(), 2) + ret = self._dll.CC_SetVelParams(self._serial_number, + ctypes.c_int(accel), + ctypes.c_int(vel)) + self._check_error(ret) + + def _get_move_acceleration(self) -> float: + acceleration = ctypes.c_int() + velocity = ctypes.c_int() + ret = self._dll.CC_GetVelParams(self._serial_number, + ctypes.byref(acceleration), + ctypes.byref(velocity)) + self._check_error(ret) + return self._device_unit_to_real(acceleration.value, 2) + + def _set_move_acceleration(self, acceleration: float) -> None: + vel = self._real_to_device_unit(self._get_move_velocity(), 1) + accel = self._real_to_device_unit(acceleration, 2) + ret = self._dll.CC_SetVelParams(self._serial_number, + ctypes.c_int(accel), + ctypes.c_int(vel)) + self._check_error(ret) + + def _get_jog_velocity(self) -> float: + acceleration = ctypes.c_int() + velocity = ctypes.c_int() + ret = self._dll.CC_GetJogVelParams(self._serial_number, + ctypes.byref(acceleration), + ctypes.byref(velocity)) + self._check_error(ret) + return self._device_unit_to_real(velocity.value, 1) + + def _set_jog_velocity(self, velocity: float) -> None: + vel = self._real_to_device_unit(velocity, 1) + accel = self._real_to_device_unit(self._get_jog_acceleration(), 2) + ret = self._dll.CC_SetJogVelParams(self._serial_number, + ctypes.c_int(accel), + ctypes.c_int(vel)) + self._check_error(ret) + + def _get_jog_acceleration(self) -> float: + acceleration = ctypes.c_int() + velocity = ctypes.c_int() + ret = self._dll.CC_GetJogVelParams(self._serial_number, + ctypes.byref(acceleration), + ctypes.byref(velocity)) + self._check_error(ret) + return self._device_unit_to_real(acceleration.value, 2) + + def _set_jog_acceleration(self, acceleration: float) -> None: + vel = self._real_to_device_unit(self._get_jog_velocity(), 1) + accel = self._real_to_device_unit(acceleration, 2) + ret = self._dll.CC_SetJogVelParams(self._serial_number, + ctypes.c_int(accel), + ctypes.c_int(vel)) + self._check_error(ret) + + def _get_position(self) -> float: + pos = self._dll.CC_GetPosition(self._serial_number) + return self._device_unit_to_real(pos, 0) + + def _set_position(self, position: float, block: bool=True) -> None: + pos = self._real_to_device_unit(position, 0) + ret = self._dll.CC_SetMoveAbsolutePosition(self._serial_number, + ctypes.c_int(pos)) + self._check_error(ret) + ret = self._dll.CC_MoveAbsolute(self._serial_number) + self._check_error(ret) + if block: + diff = abs(self._get_position() - position) + diff -= 360 if diff > 180 else diff + while abs(diff) > .001: + sleep(.2) + diff = abs(self._get_position() - position) + diff -= 360 if diff > 180 else diff + + def is_moving(self) -> bool: + """check if the motor cotnroller is moving.""" + self.log.info('check if the motor is moving') + status_bit = ctypes.c_short() + self._dll.CC_GetStatusBits(self._serial_number, + ctypes.byref(status_bit)) + if status_bit.value & 0x10 or status_bit.value & 0x20: + return True + else: + return False + + def move_to(self, position: float, block=True) -> None: + """Move the device to the specified position. + The motor may need to be homed before a position can be set. + + Args: + position (float): the set position + block (bool, optional): will wait until complete. Defaults to True. + """ + self.log.info(f'move to {position}') + pos = self._real_to_device_unit(position, 0) + ret = self._dll.CC_MoveToPosition(self._serial_number, + ctypes.c_int(pos)) + self._check_error(ret) + + if block: + self.wait_for_completion(status='moved', max_time=15) + self.position.get() + + def move_by(self, displacement: float, block: bool = True) -> None: + """Move the motor by a relative amount + + Args: + displacement (float): amount to move + block (bool, optional): will wait until complete. Defaults to True. + """ + self.log.info(f'move by {displacement}') + dis = self._real_to_device_unit(displacement, 0) + ret = self._dll.CC_MoveRelative(self._serial_number, + ctypes.c_int(dis)) + self._check_error(ret) + if block: + self.wait_for_completion(status='moved') + self.position.get() + + def move_continuous(self, direction: str = 'forward') -> None: + """start moving at the current velocity in the specified direction + + Args: + direction: the required direction of travel. + Defaults to 'forward'. Accepts 'forward' or 'reverse' + """ + self.log.info(f'move continuously. direction: {direction}') + if direction == 'forward' or direction == 'forwards': + direc = ctypes.c_short(0x01) + elif direction == 'reverse' or direction == 'backward' or direction == 'backwards': + direc = ctypes.c_short(0x02) + else: + raise ValueError('direction unrecognised') + + ret = self._dll.CC_MoveAtVelocity(self._serial_number, direc) + self._check_error(ret) + self.position.get() + + def jog(self, direction: str = 'forward', block: bool = True) -> None: + """Performs a jog + + Args: + direction: the jog direction. Defaults to 'forward'. + Accepts 'forward' or 'reverse' + block: will wait until complete. Defaults to True. + """ + self.log.info(f'perform a jog; direction: {direction}') + if direction == 'forward' or direction == 'forwards': + direc = ctypes.c_short(0x01) + elif direction == 'reverse' or direction == 'backward' or direction == 'backwards': + direc = ctypes.c_short(0x02) + else: + raise ValueError('direction unrecognised') + + ret = self._dll.CC_MoveJog(self._serial_number, direc) + self._check_error(ret) + if self._get_jog_mode() =='stepped': + if block: + self.wait_for_completion(status='moved') + self.position.get() + + def stop(self, immediate: bool = False) -> None: + """Stop the current move + + Args: + immediate: + True: stops immediately (with risk of losing track of position). + False: stops using the current velocity profile. + Defaults to False. + """ + self.log.info('stop the current move') + if immediate: + ret = self._dll.CC_StopImmediate(self._serial_number) + else: + ret = self._dll.CC_StopProfiled(self._serial_number) + self._check_error(ret) + self.wait_for_completion(status='stopped') + self.position.get() + + def close(self): + self.log.info('close the device') + if self._simulation: + self.disable_simulation() + if hasattr(self, '_serial_number'): + self._stop_polling() + self._dll.CC_Close(self._serial_number) + + def _get_hardware_info(self) -> list: + """Gets the hardware information from the device + + Returns: + list: [model number, hardware type number, number of channels, + notes describing the device, firmware version, hardware version, + hardware modification state] + """ + model = ctypes.create_string_buffer(8) + model_size = ctypes.c_ulong(8) + type_num = ctypes.c_ushort() + channel_num = ctypes.c_ushort() + notes = ctypes.create_string_buffer(48) + notes_size = ctypes.c_ulong(48) + firmware_version = ctypes.c_ulong() + hardware_version = ctypes.c_ushort() + modification_state = ctypes.c_ushort() + + ret = self._dll.CC_GetHardwareInfo( + self._serial_number, + ctypes.byref(model), model_size, + ctypes.byref(type_num), ctypes.byref(channel_num), + ctypes.byref(notes), notes_size, ctypes.byref(firmware_version), + ctypes.byref(hardware_version), ctypes.byref(modification_state) + ) + + self._check_error(ret) + return [model.value, type_num.value, channel_num.value, + notes.value, firmware_version.value, hardware_version.value, + modification_state.value] + + def _load_settings(self): + """Update device with stored settings""" + self.log.info('update the device with the stored settings') + self._dll.CC_LoadSettings(self._serial_number) + return None + + def _set_limits_approach(self, limit: int) -> None: + disallow_illegal_moves = ctypes.c_int16(0) + allow_partial_moves = ctypes.c_int16(1) + allow_all_moves = ctypes.c_int16(2) + limits_approach_policy = ctypes.c_int16(limit) + + self._dll.CC_SetLimitsSoftwareApproachPolicy( + self._serial_number, ctypes.byref(disallow_illegal_moves), + ctypes.byref(allow_partial_moves), ctypes.byref(allow_all_moves), + ctypes.byref(limits_approach_policy) + ) + return None + + def _start_polling(self, polling: int) -> None: + self.log.info('start polling') + pol = ctypes.c_int(polling) + self._dll.CC_StartPolling(self._serial_number, ctypes.byref(pol)) + return None + + def _stop_polling(self) -> None: + self.log.info('stop polling') + self._dll.CC_StopPolling(self._serial_number) + + def _clear_message_queue(self) -> None: + self.log.info('clear messages queue') + self._dll.CC_ClearMessageQueue(self._serial_number) + return None + + def _can_home(self) -> bool: + ret = self._dll.CC_CanHome(self._serial_number) + return bool(ret) diff --git a/qcodes_contrib_drivers/drivers/Thorlabs/private/LS.py b/qcodes_contrib_drivers/drivers/Thorlabs/private/LS.py new file mode 100644 index 000000000..fc595ad66 --- /dev/null +++ b/qcodes_contrib_drivers/drivers/Thorlabs/private/LS.py @@ -0,0 +1,202 @@ +# -*- coding: utf-8 -*- +"""QCoDes- Base driver for Thorlab instruments using the LS commands + +Authors: + iago-rst https://github.com/iago-rst, 2023 + Julien Barrier , 2023 +""" +import ctypes +import logging +from time import sleep +from typing import Optional + +from qcodes.parameters import Parameter +from qcodes import validators as vals + +from .kinesis import _Thorlabs_Kinesis + +log = logging.getLogger(__name__) + +class _Thorlabs_LS(_Thorlabs_Kinesis): + """Instrument driver for Thorlabs instruments using the CC commands + + Args: + name: Instrument name. + serial_number: Serial number of the device. + dll_path: Path to the kinesis dll for the instrument to use. + dll_dir: Directory in which the kinesis dll are stored. + simulation: Enables the simulation manager. Defaults to False. + polling: Polling rate in ms. Defaults to 200. + home: Sets the device to home state. Defaults to False. + """ + _CONDITIONS = ['homed', 'moved', 'stopped', 'limit_updated'] + def __init__(self, + name: str, + serial_number: str, + dll_path: str, + dll_dir: Optional[str], + simulation: bool = False, + polling: int = 200, + home: bool = False, + **kwargs): + self._dll_path = dll_path + super().__init__(name, serial_number, + self._dll_path, dll_dir, simulation, + **kwargs) + + if self._dll.TLI_BuildDeviceList() == 0: + self._open_laser() + self._start_polling(polling) + + self._info = self._get_hardware_info() + self.model = self._info[0].decode('utf-8') + self.version = self._info[4] + + self._load_settings() + self._set_limits_approach(1) + + sleep(3) + + self._clear_message_queue() + + if home: + if not self._can_home(): + self.homed = False + raise RuntimeError('Device `{}` is not homeable') + else: + self.go_home() + else: + self.homed = False + + + self.output_enabled = Parameter( + 'output_enabled', + label='Output enabled', + get_cmd=self._get_output_enabled, + set_cmd=self._set_output_enabled, + vals=vals.Bool(), + docstring='turn laser output off/on. Note that laser key switch must be turned on to turn output on.', + instrument=self + ) + + self.power = Parameter( + 'power', + label='Power output', + unit='W', + get_cmd=self._get_power, + set_cmd=self._set_power, + vals=vals.Numbers(0, .007), + instrument=self + ) + + self.connect_message() + + def identify(self) -> None: + """Sends a command to the device to make it identify itself""" + self._dll.LS_Identify(self._serial_number) + + def get_idn(self) -> dict: + """Get device identifier""" + idparts = ['Thorlabs', self.model, self.version, self.serial_number] + return dict(zip(('vendor', 'model', 'serial', 'firmware'), idparts)) + + def _get_status_bits(self) -> int: + status = self._dll.LS_GetStatusBits(self._serial_number) + if status == 0x40000000: + raise ValueError() + else: + return int(status) + + def _get_output_enabled(self) -> bool: + return bool(self._get_status_bits() & 1) + + def _set_output_enabled(self, value: bool) -> None: + if value: + self.enable_output() + else: + self.disable_output() + + def enable_output(self) -> None: + ret = self._dll.LS_EnableOutput(self._serial_number) + self._check_error(ret) + + def disable_output(self) -> None: + ret = self._dll.LS_DisableOutput(self._serial_number) + self._check_error(ret) + + def _get_power(self) -> float: + max_num = 32767 + max_power = .007 + num = self._dll.LS_GetPowerReading(self._serial_number) + return num/max_num * max_power + + def _set_power(self, power: float) -> None: + max_num = 32767 + max_power = .007 + percent = power / max_power + ret = self._dll.LS_SetPower(self._serial_number, int(percent*max_num)) + self._check_error(ret) + + def close(self): + if self._simulation: + self.disable_simulation() + if hasattr(self, '_serial_number'): + self._stop_polling() + self._dll.LS_Close(self._serial_number) + + def _get_hardware_info(self) -> list: + """Gets the hardware information from the device + + Returns: + list: [model number, hardware type number, number of channels, + notes describing the device, firmware version, hardware version, + hardware modification state] + """ + model = ctypes.create_string_buffer(8) + model_size = ctypes.c_ulong(8) + type_num = ctypes.c_ushort() + channel_num = ctypes.c_ushort() + notes = ctypes.create_string_buffer(48) + notes_size = ctypes.c_ulong(48) + firmware_version = ctypes.c_ulong() + hardware_version = ctypes.c_ushort() + modification_state = ctypes.c_ushort() + + ret = self._dll.LS_GetHardwareInfo( + self._serial_number, + ctypes.byref(model), model_size, + ctypes.byref(type_num), ctypes.byref(channel_num), + ctypes.byref(notes), notes_size, ctypes.byref(firmware_version), + ctypes.byref(hardware_version), ctypes.byref(modification_state) + ) + + self._check_error(ret) + return [model.value, type_num.value, channel_num.value, + notes.value, firmware_version.value, hardware_version.value, + modification_state.value] + + def _load_settings(self): + """Update device with stored settings""" + self._dll.LS_LoadSettings(self._serial_number) + return None + + def _open_laser(self) -> None: + ret = self._dll.LS_Open(self._serial_number) + self._check_error(ret) + + def _close_laser(self) -> None: + ret = self._dll.LS_Close(self._serial_number) + self._check_error(ret) + + def _start_polling(self, polling: int): + pol = ctypes.c_int(polling) + ret = self._dll.LS_StartPolling(self._serial_number, ctypes.byref(pol)) + self._check_error(ret) + return None + + def _stop_polling(self): + self._dll.LS_StopPolling(self._serial_number) + + def _clear_message_queue(self) -> None: + self._dll.LS_ClearMessageQueue(self._serial_number) + return None diff --git a/qcodes_contrib_drivers/drivers/Thorlabs/private/__init__.py b/qcodes_contrib_drivers/drivers/Thorlabs/private/__init__.py new file mode 100644 index 000000000..e997ee4c8 --- /dev/null +++ b/qcodes_contrib_drivers/drivers/Thorlabs/private/__init__.py @@ -0,0 +1,39 @@ +ConnexionErrors = { + 0x00: 'Success', + 0x01: 'The FTDI functions have not been initialized', + 0x02: 'The device could not be found', + 0x03: 'The device must be opened before it can be accessed', + 0x04: 'An I/O error has occured in the FTDI chip', + 0x05: 'There are insufficient resources to run this application', + 0x06: 'An invalid parameter has been supplied to the device', + 0x07: 'The device is no longer present', + 0x08: 'The device detected does not match that expected', + 0x10: 'The library for this device could not be found', + 0x11: 'No functions available for this device', + 0x12: 'The function is not available for this device', + 0x13: 'Bad function pointer detected', + 0x14: 'The generic function failed to complete successfully', + 0x15: 'The specific function failed to complete succesfully' +} + +GeneralErrors = { + 0x20: 'Attempt to open a device that was already open', + 0x21: 'The device has stopped responding', + 0x22: 'This function has not been implemented', + 0x23: 'The device has reported a fault', + 0x24: 'The function could not be completed at this time', + 0x28: 'The function could not be completed because the device is disconnected', + 0x29: 'The firmware has thrown an error', + 0x2A: 'The device has failed to initialize', + 0x2B: 'An invalid channel address was supplied' +} + +MotorErrors = { + 0x25: 'The device cannot perform this function until it has been homed', + 0x26: 'The function cannot be performed as it would result in an illegal position', + 0x27: 'An invalid velocity parameter was supplied. The velocity must be greater than zero', + 0x2C: 'This device does not support homing. Check the limit switch parameters are correct', + 0x2D: 'An invalid jog mode was supplied for the jog function', + 0x2E: 'There is no motor parameters available to convert real world units', + 0x2F: 'Command temporarily unavailable, device may be busy.' +} \ No newline at end of file diff --git a/qcodes_contrib_drivers/drivers/Thorlabs/private/kinesis.py b/qcodes_contrib_drivers/drivers/Thorlabs/private/kinesis.py new file mode 100644 index 000000000..06eff4ebc --- /dev/null +++ b/qcodes_contrib_drivers/drivers/Thorlabs/private/kinesis.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +"""QCoDeS base drivers for Thorlabs Kinesis instruments + +Authors: + Julien Barrier, +""" +import os +import sys +import ctypes +from typing import Any, Optional + +from qcodes.instrument import Instrument +from . import GeneralErrors, MotorErrors, ConnexionErrors + +class _Thorlabs_Kinesis(Instrument): + """A base class for Thorlabs kinesis instruments + + Args: + name: Instrument name. + serial_number: Serial number of the device. + dll_path: Path to the kinesis dll for the instrument to use. + dll_dir: Directory in which the kinesis dll are stored. + simulation: Enables the simulation manager. Defaults to False. + """ + def __init__(self, + name: str, + serial_number: str, + dll_path: str, + dll_dir: Optional[str] = None, + simulation: bool = False, + **kwargs): + super().__init__(name, **kwargs) + self.serial_number = serial_number + self._serial_number = ctypes.c_char_p(self.serial_number.encode('ascii')) + self._dll_path = dll_path + self._dll_dir: Optional[str] = dll_dir if dll_dir else r'C:\Program Files\Thorlabs\Kinesis' + if sys.platform != 'win32': + self._dll: Any = None + raise OSError('Thorlabs Kinesis only works on Windows') + else: + os.add_dll_directory(self._dll_dir) + self._dll = ctypes.cdll.LoadLibrary(self._dll_path) + + self._simulation = simulation + if self._simulation: + self.enable_simulation() + + self._device_info = dict(zip( + ['type_ID', 'description', 'PID', 'is_known_type', 'motor_type', + 'is_piezo', 'is_laser', 'is_custom', 'is_rack', 'max_channels'], + self._get_device_info())) + self._type_ID = self._device_info['type_ID'] + self._description = self._device_info['description'] + self._PID = self._device_info['PID'] + self._is_known_type = self._device_info['is_known_type'] + self._motor_type = self._device_info['motor_type'] + self._is_piezo = self._device_info['is_piezo'] + self._is_laser = self._device_info['is_laser'] + self._is_custom = self._device_info['is_custom'] + self._is_rack = self._device_info['is_rack'] + self._max_channels = self._device_info['max_channels'] + + def _get_device_info(self) -> list: + """Get the device information from the USB port + + Returns: + list: [type id, description, PID, is known type, motor type, + is piezo, is laser, is custom type, is rack, max channels] + """ + type_id = ctypes.c_ulong() + description = ctypes.c_char() + pid = ctypes.c_ulong() + is_known_type = ctypes.c_bool() + motor_type = ctypes.c_int() + is_piezo_device = ctypes.c_bool() + is_laser = ctypes.c_bool() + is_custom_type = ctypes.c_bool() + is_rack = ctypes.c_bool() + max_channels = ctypes.c_bool() + + ret = self._dll.TLI_GetDeviceInfo( + ctypes.byref(self._serial_number), + ctypes.byref(type_id), + ctypes.byref(description), + ctypes.byref(pid), + ctypes.byref(is_known_type), + ctypes.byref(motor_type), + ctypes.byref(is_piezo_device), + ctypes.byref(is_laser), + ctypes.byref(is_custom_type), + ctypes.byref(is_rack), + ctypes.byref(max_channels) + ) + self._check_error(ret) + return [type_id.value, description.value, pid.value, + is_known_type.value, motor_type.value, is_piezo_device.value, + is_laser.value, is_custom_type.value, is_rack.value, + max_channels.value] + + def _check_error(self, status: int) -> None: + if status != 0: + if status in ConnexionErrors: + raise ConnectionError(f'{ConnexionErrors[status]} ({status})') + elif status in GeneralErrors: + raise OSError(f'{GeneralErrors[status]} ({status})') + elif status in MotorErrors: + raise RuntimeError(f'{MotorErrors[status]} ({status})') + else: + raise ValueError(f'Unknown error code ({status})') + else: + pass + return None + + def enable_simulation(self) -> None: + """Initialise a connection to the simulation manager, which must already be running""" + self._dll.TLI_InitializeSimulations() + + def disable_simulation(self) -> None: + """Uninitialize a connection to the simulation manager, which must be running""" + self._dll.TLI_UninitializeSimulations()