Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/ref/ps6000a/conversions.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ the PicoScope needs to be initialized using `scope.open_unit()` followed by the
>>> import pypicosdk as psdk
>>> scope = psdk.ps6000a()
>>> scope.open_unit(resolution=psdk.RESOLUTION._8BIT)
>>> scope.mv_to_adc(100, channel_range=psdk.RANGE.V1)
>>> scope.mv_to_adc(100, channel=psdk.CHANNEL.A)
3251
>>> scope.close_unit()
```
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/ref/psospa/conversions.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ the PicoScope needs to be initialized using `scope.open_unit()` followed by the
>>> import pypicosdk as psdk
>>> scope = psdk.psospa()
>>> scope.open_unit(resolution=psdk.RESOLUTION._8BIT)
>>> scope.mv_to_adc(100, channel_range=psdk.RANGE.V1)
>>> scope.mv_to_adc(100, channel=psdk.CHANNEL.A)
3251
>>> scope.close_unit()
```
Expand Down
Empty file added pypicosdk/_classes/__init__.py
Empty file.
33 changes: 33 additions & 0 deletions pypicosdk/_classes/_channel_class.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
This file contains a class to hold the channel data
i.e. range information and probe scale information
"""
from dataclasses import dataclass
import numpy as np
if __name__ != '__main__':
from .. import constants as cst
else:
from pypicosdk import constants as cst


@dataclass
class ChannelClass:
"Dataclass containing channel information"
range: cst.RANGE
range_mv: int
probe_scale: float
ylim_mv: int
ylim_v: float

def __init__(self, ch_range: cst.RANGE, probe_scale: float):
self.range = ch_range
self.probe_scale = probe_scale
self.range_mv = cst.RANGE_LIST[ch_range]
self.range_v = self.range_mv / 1000
self.ylim_mv = np.array([-self.range_mv, self.range_mv]) * probe_scale
self.ylim_v = self.ylim_mv / 1000


if __name__ == '__main__':
test = ChannelClass(ch_range=cst.RANGE.V1, probe_scale=10)
print(test)
96 changes: 52 additions & 44 deletions pypicosdk/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
Copyright (C) 2025-2025 Pico Technology Ltd. See LICENSE file for terms.
"""

# flake8: noqa
# pylint: skip-file
import ctypes
import os
import warnings

import numpy as np
import numpy.ctypeslib as npc

from ._classes._channel_class import ChannelClass
from .error_list import ERROR_STRING
from .constants import *
from . import constants as cst
Expand Down Expand Up @@ -43,15 +46,13 @@ def __init__(self, dll_name, *args, **kwargs):

# Setup class variables
self.handle = ctypes.c_short()
self.range = {}
self.probe_scale = {}
self.channel_db: dict[int, ChannelClass] = {}
self.resolution = None
self.max_adc_value = None
self.min_adc_value = None
self.over_range = 0
self._actual_interval = 0

self.ylim = (0, 0)
self.last_used_volt_unit: str = 'mv'

def __exit__(self):
self.close_unit()
Expand Down Expand Up @@ -300,7 +301,7 @@ def _get_enabled_channel_flags(self) -> int:
int: Decimal of enabled channels
"""
enabled_channel_byte = 0
for channel in self.range:
for channel in self.channel_db:
enabled_channel_byte += 2**channel
return enabled_channel_byte

Expand Down Expand Up @@ -819,22 +820,21 @@ def get_minimum_timebase_stateless(self) -> dict:
}

# Data conversion ADC/mV & ctypes/int
def mv_to_adc(self, mv: float, channel_range: int, channel: CHANNEL = None) -> int:
def mv_to_adc(self, mv: float, channel: CHANNEL = None) -> int:
"""
Converts a millivolt (mV) value to an ADC value based on the device's
maximum ADC range.

Args:
mv (float): Voltage in millivolts to be converted.
channel_range (int): Range of channel in millivolts i.e. 500 mV.
channel (CHANNEL, optional): Channel associated with ``mv``. The
probe scaling for the channel will be applied if provided.

Returns:
int: ADC value corresponding to the input millivolt value.
"""
scale = self.probe_scale.get(channel, 1)
channel_range_mv = RANGE_LIST[channel_range]
scale = self.channel_db[channel].probe_scale
channel_range_mv = self.channel_db[channel].range_mv
return int(((mv / scale) / channel_range_mv) * self.max_adc_value)

def _adc_conversion(
Expand All @@ -845,8 +845,8 @@ def _adc_conversion(
) -> float | np.ndarray:
"""Converts ADC value or array to mV or V using the stored probe scaling."""
unit_scale = _get_literal(output_unit, OutputUnitV_M)
channel_range_mv = RANGE_LIST[self.range[channel]]
channel_scale = self.probe_scale[channel]
channel_range_mv = self.channel_db[channel].range_mv
channel_scale = self.channel_db[channel].probe_scale
return (((adc / self.max_adc_value) * channel_range_mv) * channel_scale) / unit_scale

def _adc_to_(
Expand All @@ -870,6 +870,10 @@ def _adc_to_(
Returns:
dict | float | np.ndarray: _description_
"""

# Update last used
self.last_used_volt_unit = unit

if isinstance(data, dict):
for channel, adc in data.items():
data[channel] = self._adc_conversion(adc, channel, output_unit=unit)
Expand Down Expand Up @@ -898,6 +902,7 @@ def adc_to_mv(
Returns:
dict, int, float, np.ndarray: Data converted into millivolts (mV)
"""
self.last_used_volt_unit = 'mv' # Update last used
return self._adc_to_(data, channel, unit='mv')

def adc_to_volts(
Expand All @@ -919,6 +924,7 @@ def adc_to_volts(
Returns:
dict, int, float, np.ndarray: Data converted into volts (V)
"""
self.last_used_volt_unit = 'v' # Update last used
return self._adc_to_(data, channel, unit='v')

def _thr_hyst_mv_to_adc(
Expand All @@ -929,11 +935,12 @@ def _thr_hyst_mv_to_adc(
hysteresis_upper_mv,
hysteresis_lower_mv
) -> tuple[int, int, int, int]:
if channel in self.range:
upper_adc = self.mv_to_adc(threshold_upper_mv, self.range[channel], channel)
lower_adc = self.mv_to_adc(threshold_lower_mv, self.range[channel], channel)
hyst_upper_adc = self.mv_to_adc(hysteresis_upper_mv, self.range[channel], channel)
hyst_lower_adc = self.mv_to_adc(hysteresis_lower_mv, self.range[channel], channel)
if channel in self.channel_db:
ch_range = self.channel_db[channel].range
upper_adc = self.mv_to_adc(threshold_upper_mv, channel)
lower_adc = self.mv_to_adc(threshold_lower_mv, channel)
hyst_upper_adc = self.mv_to_adc(hysteresis_upper_mv, channel)
hyst_lower_adc = self.mv_to_adc(hysteresis_lower_mv, channel)
else:
upper_adc = int(threshold_upper_mv)
lower_adc = int(threshold_lower_mv)
Expand All @@ -956,43 +963,39 @@ def _change_power_source(self, state: POWER_SOURCE) -> 0:
state
)

def _set_ylim(self, ch_range: RANGE | range_literal) -> None:
"""
Update the scope self.ylim with the largest channel range

Args:
ch_range (RANGE | range_literal): Range of current channel
"""
# Convert to mv
ch_range = RANGE_LIST[ch_range]

# Compare largest value
max_ylim = max(self.ylim[1], ch_range)
min_ylim = -max_ylim
self.ylim = (min_ylim, max_ylim)

def get_ylim(self, unit: OutputUnitV_L = 'mv') -> tuple[float, float]:
def get_ylim(self, unit: str | None | OutputUnitV_L = None) -> tuple[float, float]:
"""
Returns the ylim of the widest channel range as a tuple.
Returns the ylim of the widest channel range as a tuple. The unit is taken from
the last used adc to voltage conversion, but can be overwritten by declaring a
`unit` variable.
Ideal for pyplot ylim function.

Args:
unit (str): 'mv' or 'v'. Depending on whether your data is in mV
or Volts.
unit (str | None, optional): Overwrite the ylim unit using `'mv'` or `'v'`.
If None, The unit will be taken from the last voltage unit conversion.

Returns:
tuple[float, float]: Minium and maximum range values
tuple[float,float]: Minium and maximum range values

Examples:
>>> from matplotlib import pyplot as plt
>>> ...
>>> plt.ylim(scope.get_ylim())
"""
if unit is None:
# Collect last used voltage unit
unit = self.last_used_volt_unit

# Get largest channel range
largest_range_index = max(
self.channel_db,
key=lambda ch: self.channel_db[ch].range_mv
)
unit = unit.lower()
if unit.lower() == 'mv':
return self.ylim
elif unit.lower():
return self.ylim[0] / 1000, self.ylim[1] / 1000
if unit == 'mv':
return self.channel_db[largest_range_index].ylim_mv
elif unit == 'v':
return self.channel_db[largest_range_index].ylim_v

def set_device_resolution(self, resolution: RESOLUTION) -> None:
"""Configure the ADC resolution using ``ps6000aSetDeviceResolution``.
Expand Down Expand Up @@ -1042,8 +1045,8 @@ def set_simple_trigger(
channel = _get_literal(channel, channel_map)
direction = _get_literal(direction, trigger_dir_m)

if channel in self.range:
threshold_adc = self.mv_to_adc(threshold_mv, self.range[channel], channel)
if channel in self.channel_db:
threshold_adc = self.mv_to_adc(threshold_mv, channel)
else:
threshold_adc = int(threshold_mv)

Expand Down Expand Up @@ -1442,12 +1445,12 @@ def set_data_buffer_for_enabled_channels(
channels_buffer = {}
# Rapid
if captures > 0:
for channel in self.range:
for channel in self.channel_db:
np_buffer = self.set_data_buffer_rapid_capture(channel, samples, captures, segment, datatype, ratio_mode, action=ACTION.ADD)
channels_buffer[channel] = np_buffer
# Single
else:
for channel in self.range:
for channel in self.channel_db:
channels_buffer[channel] = self.set_data_buffer(channel, samples, segment, datatype, ratio_mode, action=ACTION.ADD)

return channels_buffer
Expand Down Expand Up @@ -1751,6 +1754,9 @@ def run_simple_block_capture(
>>> buffers = scope.run_simple_block_capture(timebase=3, samples=1000)
"""

# Update last used
self.last_used_volt_unit = output_unit

# Create data buffers
channel_buffer = \
self.set_data_buffer_for_enabled_channels(samples, segment, datatype, ratio_mode)
Expand Down Expand Up @@ -1809,6 +1815,8 @@ def run_simple_rapid_block_capture(
tuple[dict,np.ndarray]: Dictionary of channel buffers and the time
axis (numpy array).
"""
# Update last used
self.last_used_volt_unit = output_unit

# Segment set to 0
segment = 0
Expand Down
33 changes: 26 additions & 7 deletions pypicosdk/psospa.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
"""Copyright (C) 2025-2025 Pico Technology Ltd. See LICENSE file for terms."""

# flake8: noqa
# pylint: skip-file
import ctypes
from typing import override, Literal
import json
from warnings import warn

from ._classes._channel_class import ChannelClass
from . import constants as cst
from .constants import *
from .constants import (
TIME_UNIT
)
from . import common as cmn
from .common import PicoSDKException
from .base import PicoScopeBase
from .shared.ps6000a_psospa import shared_ps6000a_psospa
Expand Down Expand Up @@ -61,12 +67,13 @@ def open_unit(self, serial_number:str=None, resolution:RESOLUTION | resolution_l
@override
def set_channel_on(
self,
channel:CHANNEL,
range:RANGE,
coupling:COUPLING=COUPLING.DC,
offset:float=0,
bandwidth:BANDWIDTH_CH=BANDWIDTH_CH.FULL,
range_type:PICO_PROBE_RANGE_INFO=PICO_PROBE_RANGE_INFO.X1_PROBE_NV
channel: CHANNEL,
range: RANGE,
coupling: COUPLING = COUPLING.DC,
offset: float = 0,
bandwidth: BANDWIDTH_CH = BANDWIDTH_CH.FULL,
range_type: PICO_PROBE_RANGE_INFO = PICO_PROBE_RANGE_INFO.X1_PROBE_NV,
probe_scale: float = 1.0,
) -> int:
"""
Enable and configure a specific channel on the device with given parameters.
Expand All @@ -84,8 +91,16 @@ def set_channel_on(
Bandwidth limit setting for the channel. Defaults to full bandwidth.
range_type (PICO_PROBE_RANGE_INFO, optional):
Specifies the probe range type. Defaults to X1 probe (no attenuation).
probe_scale (float, optional): Probe attenuation factor e.g. 10 for x10 probe.
Default value of 1.0 (x1).
"""
self.range[channel] = range
if probe_scale != 1:
warn(
f'Ensure selected channel range of {cst.range_literal.__args__[range]} ' +
f'accounts for attenuation of x{probe_scale} at scope input',
cmn.ProbeScaleWarning)

self.channel_db[channel] = ChannelClass(ch_range=range, probe_scale=probe_scale)

range_max = ctypes.c_int64(RANGE_LIST[range] * 1_000_000)
range_min = ctypes.c_int64(-range_max.value)
Expand All @@ -103,6 +118,7 @@ def set_channel_on(
)
return status


@override
def get_nearest_sampling_interval(self, interval_s:float, round_faster:int=True) -> dict:
"""
Expand Down Expand Up @@ -267,6 +283,7 @@ def set_led_colours(
hue = [hue]
saturation = [saturation]


if isinstance(hue[0], str):
hue = [led_colours_m[i] for i in hue]

Expand All @@ -287,6 +304,7 @@ def set_led_colours(
array_len,
)


def set_all_led_states(self,state:str|led_state_l):
"""
Sets the state of all LED's on the PicoScope.
Expand All @@ -303,6 +321,7 @@ def set_led_states(self, led:str|led_channel_l|list[led_channel_l], state:str|le
Sets the state for a selected LED. Between default behaviour (auto),
on or off.


Args:
led (str): The selected LED. Must be one of these values:
`'A'`, `'B'`, `'C'`, `'D'`, `'E'`, `'F'`, `'G'`, `'H'`, `'AWG'`, `'AUX'`.
Expand Down
1 change: 0 additions & 1 deletion pypicosdk/shared/_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,3 @@

class _ProtocolBase(Protocol):
"""Protocol placeholder class for shared methods"""
def _set_ylim(self, *args, **kwargs): ...
Loading