diff --git a/qiskit_experiments/data_processing/data_action.py b/qiskit_experiments/data_processing/data_action.py index 9427bd508a..8f743a681d 100644 --- a/qiskit_experiments/data_processing/data_action.py +++ b/qiskit_experiments/data_processing/data_action.py @@ -13,7 +13,9 @@ """Defines the steps that can be used to analyse data.""" from abc import ABCMeta, abstractmethod -from typing import Any, List, Optional, Tuple +from typing import Generator, Iterator, Optional + +import numpy as np class DataAction(metaclass=ABCMeta): @@ -29,50 +31,40 @@ def __init__(self, validate: bool = True): """ self._validate = validate - @abstractmethod - def _process(self, datum: Any, error: Optional[Any] = None) -> Tuple[Any, Any]: - """ - Applies the data processing step to the datum. + def _process(self, gen_datum: Iterator) -> Generator: + """Applies the data processing step to the datum. Args: - datum: A single item of data which will be processed. - error: An optional error estimation on the datum that can be further propagated. + gen_datum: A generator of unprocessed data. Each entry is a tuple of data and error. - Returns: - processed data: The data that has been processed along with the propagated error. + Yields: + A tuple of processed data and error. """ + yield from gen_datum - @abstractmethod - def _format_data(self, datum: Any, error: Optional[Any] = None) -> Tuple[Any, Any]: - """Format and validate the input. + def _format_data(self, gen_datum: Iterator) -> Generator: + """Validate and format the input. - Check that the given data and error has the correct structure. This method may - additionally change the data type, e.g. converting a list to a numpy array. + Check that the given data and error have the correct structure. Args: - datum: The data instance to check and format. - error: An optional error estimation on the datum to check and format. - - Returns: - datum, error: The formatted datum and its optional error. + gen_datum: A generator of unformatted data. Each entry is a tuple of data and error. - Raises: - DataProcessorError: If either the data or the error do not have the proper format. + Yields: + A tuple of formatted data and error. """ + yield from gen_datum - def __call__(self, data: Any, error: Optional[Any] = None) -> Tuple[Any, Any]: + def __call__(self, gen_datum: Iterator) -> Generator: """Call the data action of this node on the data and propagate the error. Args: - data: The data to process. The action nodes in the data processor will - raise errors if the data does not have the appropriate format. - error: An optional error estimation on the datum that can be further processed. + gen_datum: A generator of raw data. Each entry is a tuple of data and error. - Returns: - processed data: The data processed by self as a tuple of processed datum and - optionally the propagated error estimate. + Yields: + A generator that implements a data processing pipeline. """ - return self._process(*self._format_data(data, error)) + yield from self._process(self._format_data(gen_datum)) def __repr__(self): """String representation of the node.""" @@ -94,11 +86,12 @@ def is_trained(self) -> bool: """ @abstractmethod - def train(self, data: List[Any]): + def train(self, full_val_arr: np.ndarray, full_err_arr: Optional[np.ndarray] = None): """Train a DataAction. Certain data processing nodes, such as a SVD, require data to first train. Args: - data: A list of datum. Each datum is a point used to train the node. + full_val_arr: A list of values. Each datum will be converted to a 2D array. + full_err_arr: A list of errors. Each datm will be converted to a 2D array. """ diff --git a/qiskit_experiments/data_processing/data_processor.py b/qiskit_experiments/data_processing/data_processor.py index 4cb45d71ad..1daac91f15 100644 --- a/qiskit_experiments/data_processing/data_processor.py +++ b/qiskit_experiments/data_processing/data_processor.py @@ -12,7 +12,10 @@ """Actions done on the data to bring it in a usable form.""" -from typing import Any, Dict, List, Set, Tuple, Union +import itertools +from typing import Any, Dict, List, Set, Tuple, Union, Generator, Iterator + +import numpy as np from qiskit_experiments.data_processing.data_action import DataAction, TrainableDataAction from qiskit_experiments.data_processing.exceptions import DataProcessorError @@ -36,7 +39,7 @@ class DataProcessor: def __init__( self, input_key: str, - data_actions: List[DataAction] = None, + data_actions: Union[DataAction, TrainableDataAction] = None, ): """Create a chain of data processing actions. @@ -45,12 +48,11 @@ def __init__( will find the data to process. data_actions: A list of data processing actions to construct this data processor with. If None is given an empty DataProcessor will be created. - to_array: Boolean indicating if the input data will be converted to a numpy array. """ self._input_key = input_key self._nodes = data_actions if data_actions else [] - def append(self, node: DataAction): + def append(self, node: Union[DataAction, TrainableDataAction]): """ Append new data action node to this data processor. @@ -125,28 +127,45 @@ def _call_internal( then all nodes in the data processing chain will be called. Returns: - datum_ and history if with_history is True or datum_ if with_history is False. + When ``with_history`` is ``False`` it returns a tuple of array-like of data and error. + Otherwise it returns a tuple of above with a list of intermediate data at each step. """ if call_up_to_node is None: call_up_to_node = len(self._nodes) - datum_, error_ = self._data_extraction(data), None + # This is generator + gen_datum = self._data_extraction(data) history = [] - for index, node in enumerate(self._nodes): + for index, node in enumerate(self._nodes[:call_up_to_node]): + # Create pipeline of data processing + gen_datum = node(gen_datum) + + if with_history and (history_nodes is None or index in history_nodes): + # make sure not to kill pipeline by execution + gen_datum, gen_datum_copy = itertools.tee(gen_datum) + out_values, out_errors = execute_pipeline(gen_datum_copy) + history.append((node.__class__.__name__, out_values, out_errors, index)) + + # Execute pipeline + out_values, out_errors = execute_pipeline(gen_datum) - if index < call_up_to_node: - datum_, error_ = node(datum_, error_) + # Return only first element if length=1, e.g. [[0, 1]] -> [0, 1] + if out_values.shape[0] == 1: + out_values = out_values[0] - if with_history and ( - history_nodes is None or (history_nodes and index in history_nodes) - ): - history.append((node.__class__.__name__, datum_, error_, index)) + # Return only first element if length=1, e.g. [[0, 1]] -> [0, 1] + if out_errors.shape[0] == 1: + out_errors = out_errors[0] + + # Return None if error is not computed + if np.isnan(out_errors).all(): + out_errors = None if with_history: - return datum_, error_, history + return out_values, out_errors, history else: - return datum_, error_ + return out_values, out_errors def train(self, data: List[Dict[str, Any]]): """Train the nodes of the data processor. @@ -154,14 +173,13 @@ def train(self, data: List[Dict[str, Any]]): Args: data: The data to use to train the data processor. """ - for index, node in enumerate(self._nodes): if isinstance(node, TrainableDataAction): if not node.is_trained: # Process the data up to the untrained node. - node.train(self._call_internal(data, call_up_to_node=index)[0]) + node.train(*self._call_internal(data, call_up_to_node=index)) - def _data_extraction(self, data: Union[Dict, List[Dict]]) -> List: + def _data_extraction(self, data: Union[Dict, List[Dict]]) -> Generator: """Extracts the data on which to run the nodes. If the datum is a list of dicts then the data under self._input_key is extracted @@ -172,35 +190,84 @@ def _data_extraction(self, data: Union[Dict, List[Dict]]) -> List: Args: data: A list of such dicts where the data is contained under the key self._input_key. - Returns: - The data formatted in such a way that it is ready to be processed by the nodes. + Yields: + A tuple of numpy array object representing a data and error. Raises: DataProcessorError: - If the input datum is not a list or a dict. - - If the data processor received a single datum but requires all the data to - process it properly. - If the input key of the data processor is not contained in the data. """ if isinstance(data, dict): data = [data] - try: - data_ = [_datum[self._input_key] for _datum in iter(data)] - except KeyError as error: - raise DataProcessorError( - f"The input key {self._input_key} was not found in the input datum." - ) from error - except TypeError as error: - raise DataProcessorError( - f"{self.__class__.__name__} only extracts data from " - f"lists or dicts, received {type(data)}." - ) from error - - return data_ + for datum in data: + try: + target = datum[self._input_key] + + # returns data and initial error + if isinstance(target, dict): + # likely level2 data, forcibly convert into array + yield np.asarray([target], dtype=object), np.asarray([np.nan], dtype=float) + else: + try: + # level1 or below + nominal_arr = np.asarray(target, dtype=float) + stdev_arr = np.full_like(target, np.nan, dtype=float) + except TypeError: + # level2 memory ["00", "11", "01", ...] + nominal_arr = np.asarray(target, dtype=object) + stdev_arr = np.asarray([np.nan], dtype=float) + yield nominal_arr, stdev_arr + + except KeyError as error: + raise DataProcessorError( + f"The input key {self._input_key} was not found in the input datum." + ) from error + except TypeError as error: + raise DataProcessorError( + f"{self.__class__.__name__} only extracts data from " + f"lists or dicts, received {type(data)}." + ) from error def __repr__(self): """String representation of data processors.""" names = ", ".join(node.__class__.__name__ for node in self._nodes) return f"{self.__class__.__name__}(input_key={self._input_key}, nodes=[{names}])" + + +def execute_pipeline(gen_datum: Iterator) -> Tuple[np.ndarray, np.ndarray]: + """Execute processing pipeline and return processed data array. + + Args: + gen_datum: A generator to sequentially return datum. + + Returns: + A tuple of nominal values and standard errors. + """ + out_values, out_errors = list(zip(*gen_datum)) + + try: + # try to convert into float object for performance + out_values = np.asarray(out_values, dtype=float) + except TypeError: + # if not convert into arbitrary array + out_values = np.asarray(out_values, dtype=object) + + # convert into 1D array e.g. [[0], [1], ...] -> [0, 1, ...] + if len(out_values.shape) == 2 and out_values.shape[1] == 1: + out_values = out_values[:, 0] + + try: + # try to convert into float object for performance + out_errors = np.asarray(out_errors, dtype=float) + except TypeError: + # if not convert into arbitrary array + out_errors = np.asarray(out_errors, dtype=object) + + # convert into 1D array e.g. [[0], [1], ...] -> [0, 1, ...] + if len(out_errors.shape) == 2 and out_errors.shape[1] == 1: + out_errors = out_errors[:, 0] + + return out_values, out_errors diff --git a/qiskit_experiments/data_processing/nodes.py b/qiskit_experiments/data_processing/nodes.py index 24af0f3842..4307b98694 100644 --- a/qiskit_experiments/data_processing/nodes.py +++ b/qiskit_experiments/data_processing/nodes.py @@ -10,15 +10,18 @@ # copyright notice, and modified files need to carry a notice indicating # that they have been altered from the originals. +# pylint: disable=arguments-differ + """Different data analysis steps.""" -from abc import abstractmethod from numbers import Number -from typing import Any, Dict, List, Optional, Tuple, Union, Sequence +from typing import List, Tuple, Union, Sequence, Generator, Iterator, Optional + import numpy as np from qiskit_experiments.data_processing.data_action import DataAction, TrainableDataAction from qiskit_experiments.data_processing.exceptions import DataProcessorError +from .data_processor import execute_pipeline class AverageData(DataAction): @@ -30,76 +33,126 @@ def __init__(self, axis: int, validate: bool = True): Args: axis: The axis along which to average. validate: If set to False the DataAction will not validate its input. + + Note: + Axis depends on data type. ``axis = 0`` indicates averaging over + different circuits in the experiment data. Level2 data has only this axis. + As a reminder the table below shows that various data levels in Qiskit + and their dimension. The AverageData node will simply average the + given array over the specified axis. + + ============ ============= ===== + `meas_level` `meas_return` shape + ============ ============= ===== + 0 `single` np.ndarray[shots, memory_slots, memory_slot_size] + 0 `avg` np.ndarray[memory_slots, memory_slot_size] + 1 `single` np.ndarray[shots, memory_slots] + 1 `avg` np.ndarray[memory_slots] + 2 `memory=True` list + ============ ============= ===== + """ super().__init__(validate) self._axis = axis - def _format_data(self, datum: Any, error: Optional[Any] = None): - """Format the data into numpy arrays.""" - datum = np.asarray(datum, dtype=float) + def _format_data(self, gen_datum: Iterator) -> Generator: + """Format and validate. - if self._validate: - if len(datum.shape) <= self._axis: - raise DataProcessorError( - f"Cannot average the {len(datum.shape)} dimensional " - f"array along axis {self._axis}." - ) + Args: + gen_datum: A pipeline. + + Yields: + A formatted value array. Error is discarded. - if error is not None: - error = np.asarray(error, dtype=float) + Raises: + DataProcessorError: When non-existing data axis is specified. + """ + for value_array, error_array in gen_datum: + value_array = np.asarray(value_array, dtype=float) + error_array = np.asarray(error_array, dtype=float) + + if self._validate: + # shape is reduced because this is a single entry + if len(value_array.shape) <= self._axis - 1: + raise DataProcessorError( + f"Cannot average the {len(value_array.shape)} dimensional " + f"array along axis {self._axis}." + ) - return datum, error + yield value_array, error_array - def _process( - self, datum: np.array, error: Optional[np.array] = None - ) -> Tuple[np.array, np.array]: + def _process(self, gen_datum: Iterator) -> Generator: """Average the data. Args: - datum: an array of data. + gen_datum: A pipeline. - Returns: + Yields: Two arrays with one less dimension than the given datum and error. The error is the standard error of the mean, i.e. the standard deviation of the datum divided by :math:`sqrt{N}` where :math:`N` is the number of data points. - - Raises: - DataProcessorError: If the axis is not an int. """ - standard_error = np.std(datum, axis=self._axis) / np.sqrt(datum.shape[self._axis]) + if self._axis == 0: + # average over different circuits. execute pipeline. + full_val_arr, _ = execute_pipeline(gen_datum) + n_circs = full_val_arr.shape[0] - return np.average(datum, axis=self._axis), standard_error + # take average over full matrix + avg_mat = np.average(full_val_arr, axis=self._axis) + std_mat = np.std(full_val_arr, axis=self._axis) / np.sqrt(n_circs) + + yield np.asarray(avg_mat, dtype=float), np.asarray(std_mat, dtype=float) + else: + # keep pipeline, e.g. averaging over shots of single circuit + for value_array, _ in gen_datum: + axis = self._axis - 1 + n_elements = value_array.shape[axis] + + avg_mat = np.average(value_array, axis=axis) + std_mat = np.std(value_array, axis=axis) / np.sqrt(n_elements) + + yield avg_mat, std_mat class MinMaxNormalize(DataAction): """Normalizes the data.""" - def _format_data(self, datum: Any, error: Optional[Any] = None): - """Format the data into numpy arrays.""" - datum = np.asarray(datum, dtype=float) + def _format_data(self, gen_datum: Iterator) -> Tuple[np.ndarray, np.ndarray]: + """Format and validate. - if error is not None: - error = np.asarray(error, dtype=float) + Args: + gen_datum: A pipeline. - return datum, error + Returns: + A tuple of formatted values and error arrays. + """ + full_val_arr, full_err_arr = execute_pipeline(gen_datum) + return np.asarray(full_val_arr, dtype=float), np.asarray(full_err_arr, dtype=float) - def _process( - self, datum: np.array, error: Optional[np.array] = None - ) -> Tuple[np.array, np.array]: - """Normalize the data to the interval [0, 1].""" - min_y, max_y = np.min(datum), np.max(datum) + def _process(self, full_arrays_tup: Tuple[np.ndarray, np.ndarray]) -> Generator: + """Normalzie data. This node execute pipeline and generate full data array. - if error is not None: - return (datum - min_y) / (max_y - min_y), error / (max_y - min_y) - else: - return (datum - min_y) / (max_y - min_y), None + Args: + full_arrays_tup: Values and errors from executed pipeline. + + Yields: + Values normalized to the interval [0, 1]. + """ + full_val_arr, full_err_arr = full_arrays_tup + + min_y, max_y = np.min(full_val_arr), np.max(full_val_arr) + scale = float(max_y) - float(min_y) + + for out_value, out_error in zip(full_val_arr, full_err_arr): + yield (out_value - min_y) / scale, out_error / scale class SVD(TrainableDataAction): """Singular Value Decomposition of averaged IQ data.""" def __init__(self, validate: bool = True): - """ + """Create new action. + Args: validate: If set to False the DataAction will not validate its input. """ @@ -107,38 +160,101 @@ def __init__(self, validate: bool = True): self._main_axes = None self._means = None self._scales = None + self._n_shots = 0 + self._n_slots = 0 + self._n_iq = 0 - def _format_data(self, datum: Any, error: Optional[Any] = None) -> Tuple[Any, Any]: - """Check that the IQ data is 2D and convert it to a numpy array. + def _format_data(self, gen_datum: Iterator) -> Generator: + """Format and validate. Args: - datum: A single item of data which corresponds to single-shot IQ data. + gen_datum: A pipeline. - Returns: - datum and any error estimate as a numpy array. + Yields: + A tuple of formatted data and error. Raises: DataProcessorError: If the datum does not have the correct format. """ - datum = np.asarray(datum, dtype=float) + for value_array, error_array in gen_datum: + value_array = np.asarray(value_array, dtype=float) + error_array = np.asarray(error_array, dtype=float) + self._n_shots = 0 + self._n_slots = 0 + self._n_iq = 0 + + # identify shape + try: + # level1 single mode + self._n_shots, self._n_slots, self._n_iq = value_array.shape + except ValueError: + try: + # level1 average mode + self._n_slots, self._n_iq = value_array.shape + except ValueError as ex: + raise DataProcessorError( + f"Data given to {self.__class__.__name__} is not likely level1 data." + ) from ex + + if self._validate: + if self._n_iq != 2: + raise DataProcessorError( + f"IQ data given to {self.__class__.__name__} must be a 2D array. " + f"Instead, a {self._n_iq}D array was given." + ) + if value_array.shape != error_array.shape: + raise DataProcessorError( + f"IQ data error given to {self.__class__.__name__} is invalid data shape." + ) - if error is not None: - error = np.asarray(error, dtype=float) + yield value_array, error_array - if self._validate: - if len(datum.shape) not in {2, 3}: - raise DataProcessorError( - f"IQ data given to {self.__class__.__name__} must be a 2D array. " - f"Instead, a {len(datum.shape)}D array was given." + def _process(self, gen_datum: Iterator) -> Generator: + """Compute singular values. + + Args: + gen_datum: A generator of unprocessed data. Each entry is a tuple of data and error. + + Yields: + A tuple of processed data and error. + + Raises: + DataProcessorError: If the SVD has not been previously trained on data. + """ + if not self.is_trained: + raise DataProcessorError("SVD must be trained on data before it can be used.") + + for value_array, error_array in gen_datum: + if self._n_shots == 0: + # level1 single mode, IQ axis is projected + singular_vals = np.zeros(self._n_slots, dtype=float) + error_vals = np.zeros(self._n_slots, dtype=float) + else: + # level1 average mode, IQ axis is projected + singular_vals = np.zeros((self._n_shots, self._n_slots), dtype=float) + error_vals = np.zeros((self._n_shots, self._n_slots), dtype=float) + + # process each averaged IQ point with its own axis. + for idx in range(self._n_slots): + scale = self.scales[idx] + centered = np.array( + [ + value_array[..., idx, iq] - self.means(qubit=idx, iq_index=iq) + for iq in [0, 1] + ] ) + angle = np.arctan(self._main_axes[idx][1] / self._main_axes[idx][0]) - if error is not None and len(error.shape) not in {2, 3}: - raise DataProcessorError( - f"IQ data error given to {self.__class__.__name__} must be a 2D array." - f"Instead, a {len(error.shape)}D array was given." + singular_vals[..., idx] = (self._main_axes[idx] @ centered) / scale + error_vals[..., idx] = ( + np.sqrt( + (error_array[..., idx, 0] * np.cos(angle)) ** 2 + + (error_array[..., idx, 1] * np.sin(angle)) ** 2 + ) + / scale ) - return datum, error + yield singular_vals, error_vals @property def axis(self) -> List[np.array]: @@ -175,64 +291,7 @@ def is_trained(self) -> bool: """ return self._main_axes is not None - def _process( - self, datum: np.array, error: Optional[np.array] = None - ) -> Tuple[np.array, np.array]: - """Project the IQ data onto the axis defined by an SVD and scale it. - - Args: - datum: A 2D array of qubits, and an average complex IQ point as [real, imaginary]. - error: An optional 2D array of qubits, and an error on an average complex IQ - point as [real, imaginary]. - - Returns: - A Tuple of 1D arrays of the result of the SVD and the associated error. Each entry - is the real part of the averaged IQ data of a qubit. - - Raises: - DataProcessorError: If the SVD has not been previously trained on data. - """ - - if not self.is_trained: - raise DataProcessorError("SVD must be trained on data before it can be used.") - - n_qubits = datum.shape[0] if len(datum.shape) == 2 else datum.shape[1] - processed_data = [] - - if error is not None: - processed_error = [] - else: - processed_error = None - - # process each averaged IQ point with its own axis. - for idx in range(n_qubits): - - centered = np.array( - [datum[..., idx, iq] - self.means(qubit=idx, iq_index=iq) for iq in [0, 1]] - ) - - processed_data.append((self._main_axes[idx] @ centered) / self.scales[idx]) - - if error is not None: - angle = np.arctan(self._main_axes[idx][1] / self._main_axes[idx][0]) - error_value = np.sqrt( - (error[..., idx, 0] * np.cos(angle)) ** 2 - + (error[..., idx, 1] * np.sin(angle)) ** 2 - ) - processed_error.append(error_value / self.scales[idx]) - - if len(processed_data) == 1: - if error is None: - return processed_data[0], None - else: - return processed_data[0], processed_error[0] - - if error is None: - return np.array(processed_data), None - else: - return np.array(processed_data), np.array(processed_error) - - def train(self, data: List[Any]): + def train(self, full_val_arr: np.ndarray, full_err_arr: Optional[np.ndarray] = None): """Train the SVD on the given data. Each element of the given data will be converted to a 2D array of dimension @@ -243,19 +302,27 @@ def train(self, data: List[Any]): qubit so that future data points can be projected onto the axis. Args: - data: A list of datums. Each datum will be converted to a 2D array. + full_val_arr: A list of values. Each datum will be converted to a 2D array. + full_err_arr: A list of errors. Each datm will be converted to a 2D array. """ - if data is None: + if full_val_arr is None: return - n_qubits = self._format_data(data[0])[0].shape[0] + # Format + full_val_arr = np.asarray(full_val_arr, dtype=float) + + if full_err_arr is None: + full_err_arr = np.full_like(full_val_arr, np.nan, dtype=float) + + # TODO should consider error + _ = np.asarray(full_err_arr, dtype=float) self._main_axes = [] self._scales = [] self._means = [] - - for qubit_idx in range(n_qubits): - datums = np.vstack([self._format_data(datum)[0][qubit_idx] for datum in data]).T + n_slots = full_val_arr.shape[1] + for slot_idx in range(n_slots): + datums = np.vstack([datum[slot_idx] for datum in full_val_arr]).T # Calculate the mean of the data to recenter it in the IQ plane. mean_i = np.average(datums[0, :]) @@ -276,7 +343,8 @@ class IQPart(DataAction): """Abstract class for IQ data post-processing.""" def __init__(self, scale: float = 1.0, validate: bool = True): - """ + """Create new action. + Args: scale: Float with which to multiply the IQ data. Defaults to 1.0. validate: If set to False the DataAction will not validate its input. @@ -284,8 +352,7 @@ def __init__(self, scale: float = 1.0, validate: bool = True): self.scale = scale super().__init__(validate) - @abstractmethod - def _process(self, datum: np.array, error: Optional[np.array] = None) -> np.array: + def _process(self, gen_datum: Iterator) -> Generator: """Defines how the IQ point is processed. The dimension of the input datum corresponds to different types of data: @@ -294,52 +361,52 @@ def _process(self, datum: np.array, error: Optional[np.array] = None) -> np.arra - 4D represents all data of single-shot data. Args: - datum: A N dimensional array of complex IQ points as [real, imaginary]. - error: A N dimensional array of errors on complex IQ points as [real, imaginary]. + gen_datum: A generator of unprocessed data. Each entry is a tuple of data and error. - Returns: - Processed IQ point and its associated error estimate. + Yields: + A tuple of processed data and error. """ + raise NotImplementedError + + def _format_data(self, gen_datum: Iterator) -> Generator: + """Validate and format the input. - def _format_data(self, datum: Any, error: Optional[Any] = None) -> Tuple[Any, Any]: - """Check that the IQ data has the correct format and convert to numpy array. + Check that the given data and error have the correct structure. Args: - datum: A single item of data which corresponds to single-shot IQ data. It's - dimension will depend on whether it is single-shot IQ data (three-dimensional) - or averaged IQ date (two-dimensional). + gen_datum: A generator of unformatted data. Each entry is a tuple of data and error. - Returns: - datum and any error estimate as a numpy array. + Yields: + A tuple of formatted data and error. Raises: DataProcessorError: If the datum does not have the correct format. """ - datum = np.asarray(datum, dtype=float) - - if error is not None: - error = np.asarray(error, dtype=float) + for value_array, error_array in gen_datum: + value_array = np.asarray(value_array, dtype=float) + error_array = np.asarray(error_array, dtype=float) - if self._validate: - if len(datum.shape) not in {2, 3, 4}: - raise DataProcessorError( - f"IQ data given to {self.__class__.__name__} must be an N dimensional" - f"array with N in (2, 3, 4). Instead, a {len(datum.shape)}D array was given." - ) - - if error is not None and len(error.shape) not in {2, 3, 4}: - raise DataProcessorError( - f"IQ data error given to {self.__class__.__name__} must be an N dimensional" - f"array with N in (2, 3, 4). Instead, a {len(error.shape)}D array was given." - ) + if self._validate: + if len(value_array.shape) not in {1, 2, 3}: + raise DataProcessorError( + f"IQ data given to {self.__class__.__name__} must be an N dimensional" + "array with N in (1, 2, 3). " + f"Instead, a {len(value_array.shape)}D array was given." + ) - if error is not None and len(error.shape) != len(datum.shape): - raise DataProcessorError( - "Datum and error do not have the same shape: " - f"{len(datum.shape)} != {len(error.shape)}." - ) + if len(error_array.shape) not in {1, 2, 3}: + raise DataProcessorError( + f"IQ data error given to {self.__class__.__name__} must be an N dimensional" + "array with N in (1, 2, 3). " + f"Instead, a {len(error_array.shape)}D array was given." + ) - return datum, error + if len(error_array.shape) != len(value_array.shape): + raise DataProcessorError( + "Datum and error do not have the same shape: " + f"{len(value_array.shape)} != {len(error_array.shape)}." + ) + yield value_array, error_array def __repr__(self): """String representation of the node.""" @@ -349,46 +416,35 @@ def __repr__(self): class ToReal(IQPart): """IQ data post-processing. Isolate the real part of single-shot IQ data.""" - def _process( - self, datum: np.array, error: Optional[np.array] = None - ) -> Tuple[np.array, np.array]: + def _process(self, gen_datum: Iterator) -> Generator: """Take the real part of the IQ data. Args: - datum: An N dimensional array of shots, qubits, and a complex IQ point as - [real, imaginary]. - error: An N dimensional optional array of shots, qubits, and an error on a - complex IQ point as [real, imaginary]. + gen_datum: A generator of unprocessed data. Each entry is a tuple of data and error. - Returns: - A N-1 dimensional array, each entry is the real part of the given IQ data and error. + Yields: + A tuple of processed data and error. A N-1 dimensional array, + each entry is the real part of the given IQ data and error. """ - if error is not None: - return datum[..., 0] * self.scale, error[..., 0] * self.scale - else: - return datum[..., 0] * self.scale, None + for value_array, error_array in gen_datum: + yield value_array[..., 0] * self.scale, error_array[..., 0] * self.scale class ToImag(IQPart): """IQ data post-processing. Isolate the imaginary part of single-shot IQ data.""" - def _process(self, datum: np.array, error: Optional[np.array] = None) -> np.array: + def _process(self, gen_datum: Iterator) -> Generator: """Take the imaginary part of the IQ data. Args: - datum: An N dimensional array of shots, qubits, and a complex IQ point as - [real, imaginary]. - error: An N dimensional optional array of shots, qubits, and an error on a - complex IQ point as [real, imaginary]. + gen_datum: A generator of unprocessed data. Each entry is a tuple of data and error. - Returns: - A N-1 dimensional array, each entry is the imaginary part of the given IQ data - and error. + Yields: + A tuple of processed data and error. A N-1 dimensional array, + each entry is the imaginary part of the given IQ data and error. """ - if error is not None: - return datum[..., 1] * self.scale, error[..., 1] * self.scale - else: - return datum[..., 1] * self.scale, None + for value_array, error_array in gen_datum: + yield value_array[..., 1] * self.scale, error_array[..., 1] * self.scale class Probability(DataAction): @@ -461,83 +517,61 @@ def __init__( self._alpha_prior = list(alpha_prior) super().__init__(validate) - def _format_data(self, datum: dict, error: Optional[Any] = None) -> Tuple[dict, Any]: + def _format_data(self, gen_datum: Iterator) -> Generator: """ Checks that the given data has a counts format. Args: - datum: An instance of data the should be a dict with bit strings as keys - and counts as values. + gen_datum: A generator of unformatted data. Each entry is a tuple of data and error. - Returns: - The datum as given. + Yields: + The datum typecasted to dictionary. Raises: DataProcessorError: if the data is not a counts dict or a list of counts dicts. """ - if self._validate: - - if isinstance(datum, dict): - data = [datum] - elif isinstance(datum, list): - data = datum - else: - raise DataProcessorError(f"Datum must be dict or list, received {type(datum)}.") + for value_array, _ in gen_datum: + # Discard previous data. Probability is determined by sampling error. + # Any IQ distribution variance will be ignored at this stage. + count_dict = value_array[0] - for datum_ in data: - if not isinstance(datum_, dict): + if self._validate: + if not isinstance(count_dict, dict): raise DataProcessorError( - f"Given counts datum {datum_} to " + f"Given counts datum {count_dict} to " f"{self.__class__.__name__} is not a valid count format." ) - - for bit_str, count in datum_.items(): + for bit_str, count in count_dict.items(): if not isinstance(bit_str, str): raise DataProcessorError( - f"Key {bit_str} is not a valid count key in{self.__class__.__name__}." + f"Key {bit_str} is not a valid count key in {self.__class__.__name__}." ) - if not isinstance(count, (int, float, np.integer)): raise DataProcessorError( f"Count {bit_str} is not a valid count value in {self.__class__.__name__}." ) - return datum, None + yield count_dict + + def _process(self, gen_datum: Iterator) -> Generator: + """Compute probability and sampling error. - def _process( - self, - datum: Union[Dict[str, Any], List[Dict[str, Any]]], - error: Optional[Union[Dict, List]] = None, - ) -> Union[Tuple[float, float], Tuple[np.array, np.array]]: - """ Args: - datum: The data dictionary,taking the data under counts and - adding the corresponding probabilities. + gen_datum: A generator of unprocessed data. Each entry is a count dictionary. - Returns: - processed data: A dict with the populations and standard deviation. + Yields: + A tuple of processed data and error. """ - if isinstance(datum, dict): - return self._population_error(datum) - else: - populations, errors = [], [] - - for datum_ in datum: - pop, error = self._population_error(datum_) - populations.append(pop) - errors.append(error) + for count_dict in gen_datum: - return np.array(populations), np.array(errors) + shots = sum(count_dict.values()) + freq = count_dict.get(self._outcome, 0) + alpha_posterior = [freq + self._alpha_prior[0], shots - freq + self._alpha_prior[1]] + alpha_sum = sum(alpha_posterior) + p_mean = alpha_posterior[0] / alpha_sum + p_var = p_mean * (1 - p_mean) / (alpha_sum + 1) - def _population_error(self, counts_dict: Dict[str, int]) -> Tuple[float, float]: - """Helper method""" - shots = sum(counts_dict.values()) - freq = counts_dict.get(self._outcome, 0) - alpha_posterior = [freq + self._alpha_prior[0], shots - freq + self._alpha_prior[1]] - alpha_sum = sum(alpha_posterior) - p_mean = alpha_posterior[0] / alpha_sum - p_var = p_mean * (1 - p_mean) / (alpha_sum + 1) - return p_mean, np.sqrt(p_var) + yield np.asarray([p_mean], dtype=float), np.asarray([np.sqrt(p_var)], dtype=float) class BasisExpectationValue(DataAction): @@ -547,40 +581,38 @@ class BasisExpectationValue(DataAction): The sign becomes P(0) -> 1, P(1) -> -1. """ - def _format_data( - self, datum: np.ndarray, error: Optional[np.ndarray] = None - ) -> Tuple[Any, Any]: - """Check that the input data are probabilities. + def _format_data(self, gen_datum: Iterator) -> Generator: + """Validate and format the input. + + Check if given value is likely probability. Args: - datum: An array representing probabilities. - error: An array representing error. + gen_datum: A generator of unformatted data. Each entry is a tuple of data and error. - Returns: - Arrays of probability and its error + Yields: + A tuple of formatted data and error. Raises: DataProcessorError: When input value is not in [0, 1] """ - if not all(0.0 <= p <= 1.0 for p in datum): - raise DataProcessorError( - f"Input data for node {self.__class__.__name__} is not likely probability." - ) - return datum, error - - def _process( - self, datum: np.array, error: Optional[np.array] = None - ) -> Tuple[np.array, np.array]: + for value_array, error_array in gen_datum: + value = float(value_array) + error = float(error_array) + if self._validate: + if not 0 < value < 1: + raise DataProcessorError( + f"Input data for node {self.__class__.__name__} is not likely probability." + ) + yield value, error + + def _process(self, gen_datum: Iterator) -> Generator: """Compute eigenvalue. Args: - datum: An array representing probabilities. - error: An array representing error. + gen_datum: A generator of unprocessed data. Each entry is a tuple of data and error. - Returns: - Arrays of eigenvalues and its error + Yields: + A tuple of processed data and error. """ - if error is not None: - return 2 * (0.5 - datum), 2 * error - else: - return 2 * (0.5 - datum), None + for value, stdev in gen_datum: + yield np.asarray(2 * (0.5 - value), dtype=float), np.asarray(2 * stdev, dtype=float) diff --git a/test/data_processing/test_data_processing.py b/test/data_processing/test_data_processing.py index b1c2d690c2..59dd433d46 100644 --- a/test/data_processing/test_data_processing.py +++ b/test/data_processing/test_data_processing.py @@ -81,11 +81,11 @@ def test_empty_processor(self): data_processor = DataProcessor("counts") datum, error = data_processor(self.exp_data_lvl2.data(0)) - self.assertEqual(datum, [{"00": 4, "10": 6}]) + self.assertEqual(datum, {"00": 4, "10": 6}) self.assertIsNone(error) datum, error, history = data_processor.call_with_history(self.exp_data_lvl2.data(0)) - self.assertEqual(datum, [{"00": 4, "10": 6}]) + self.assertEqual(datum, {"00": 4, "10": 6}) self.assertEqual(history, []) def test_to_real(self): @@ -322,7 +322,7 @@ def setUp(self): [[0.9, 0.9], [-1.1, 1.0]], ] ) - self._sig_gs = np.array([[1.0], [-1.0]]) / np.sqrt(2.0) + self._sig_gs = np.array([1.0, -1.0]) / np.sqrt(2.0) circ_gs = ExperimentResultData( memory=[ @@ -332,7 +332,7 @@ def setUp(self): [[-0.9, -0.9], [1.1, -1.0]], ] ) - self._sig_es = np.array([[-1.0], [1.0]]) / np.sqrt(2.0) + self._sig_es = np.array([-1.0, 1.0]) / np.sqrt(2.0) circ_x90p = ExperimentResultData( memory=[ @@ -342,7 +342,7 @@ def setUp(self): [[1.0, 1.0], [-1.0, 1.0]], ] ) - self._sig_x90 = np.array([[0], [0]]) + self._sig_x90 = np.array([0, 0]) circ_x45p = ExperimentResultData( memory=[ @@ -352,7 +352,7 @@ def setUp(self): [[1.0, 1.0], [-1.0, 1.0]], ] ) - self._sig_x45 = np.array([[0.5], [-0.5]]) / np.sqrt(2.0) + self._sig_x45 = np.array([0.5, -0.5]) / np.sqrt(2.0) res_es = ExperimentResult( shots=4, @@ -460,7 +460,7 @@ def test_process_all_data(self): self._sig_x90.reshape(1, 2), self._sig_x45.reshape(1, 2), ) - ).T + ) # Test processing of all data processed = processor(self.data.data())[0] @@ -480,7 +480,7 @@ def test_normalize(self): processor.train([self.data.data(idx) for idx in [0, 1]]) self.assertTrue(processor.is_trained) - all_expected = np.array([[0.0, 1.0, 0.5, 0.75], [1.0, 0.0, 0.5, 0.25]]) + all_expected = np.array([[0.0, 1.0], [1.0, 0.0], [0.5, 0.5], [0.75, 0.25]]) # Test processing of all data processed = processor(self.data.data())[0] @@ -559,7 +559,7 @@ def test_normalize(self): processor.train([self.data.data(idx) for idx in [0, 1]]) self.assertTrue(processor.is_trained) - all_expected = np.array([[0.0, 1.0, 0.5, 0.75], [1.0, 0.0, 0.5, 0.25]]) + all_expected = np.array([[0.0, 1.0], [1.0, 0.0], [0.5, 0.5], [0.75, 0.25]]) # Test processing of all data processed = processor(self.data.data())[0] diff --git a/test/data_processing/test_nodes.py b/test/data_processing/test_nodes.py index 9832dcf533..afffc19053 100644 --- a/test/data_processing/test_nodes.py +++ b/test/data_processing/test_nodes.py @@ -28,22 +28,36 @@ from . import BaseDataProcessorTest +def processor_wrapper(node, values, errors=None): + """A helper function to execute node.""" + values = np.asarray(values, dtype=object) + if errors is None: + errors = np.full_like(values, np.nan, dtype=float) + errors = np.asarray(errors, dtype=float) + + node_out = list(node(zip(values, errors))) + return list(zip(*node_out)) + + class TestAveraging(BaseDataProcessorTest): """Test the averaging nodes.""" def test_simple(self): """Simple test of averaging.""" - - datum = np.array([[1, 2], [3, 4], [5, 6]]) + source = np.array([[1, 2], [3, 4], [5, 6]]) node = AverageData(axis=1) - self.assertTrue(np.allclose(node(datum)[0], np.array([1.5, 3.5, 5.5]))) - self.assertTrue(np.allclose(node(datum)[1], np.array([0.5, 0.5, 0.5]) / np.sqrt(2))) + node_out = processor_wrapper(node, source) + + self.assertTrue(np.allclose(node_out[0], np.array([1.5, 3.5, 5.5]))) + self.assertTrue(np.allclose(node_out[1], np.array([0.5, 0.5, 0.5]) / np.sqrt(2))) node = AverageData(axis=0) - self.assertTrue(np.allclose(node(datum)[0], np.array([3.0, 4.0]))) + node_out = processor_wrapper(node, source) + + self.assertTrue(np.allclose(node_out[0], np.array([3.0, 4.0]))) std = np.std([1, 3, 5]) - self.assertTrue(np.allclose(node(datum)[1], np.array([std, std]) / np.sqrt(3))) + self.assertTrue(np.allclose(node_out[1], np.array([std, std]) / np.sqrt(3))) def test_iq_averaging(self): """Test averaging of IQ-data.""" @@ -64,11 +78,9 @@ def test_iq_averaging(self): self.create_experiment(iq_data, single_shot=True) avg_iq = AverageData(axis=0) - - avg_datum, error = avg_iq(self.iq_experiment.data(0)["memory"]) + avg_datum, error = processor_wrapper(avg_iq, self.iq_experiment.data(0)["memory"]) expected_avg = np.array([[8.82943876e13, -1.27850527e15], [1.43410186e14, -3.89952402e15]]) - expected_std = np.array( [[5.07650185e14, 4.44664719e13], [1.40522641e15, 1.22326831e14]] ) / np.sqrt(10) @@ -91,9 +103,14 @@ def test_simple(self): node = MinMaxNormalize() - self.assertTrue(np.allclose(node(data)[0], expected_data)) - self.assertTrue(np.allclose(node(data, error)[0], expected_data)) - self.assertTrue(np.allclose(node(data, error)[1], expected_error)) + # error free data + node_out = processor_wrapper(node, data) + self.assertTrue(np.allclose(node_out[0], expected_data)) + + # error data + node_out = processor_wrapper(node, data, error) + self.assertTrue(np.allclose(node_out[0], expected_data)) + self.assertTrue(np.allclose(node_out[1], expected_error)) class TestSVD(BaseDataProcessorTest): @@ -118,15 +135,15 @@ def test_simple_data(self): # qubit 1 IQ data is oriented along (1, -1) self.assertTrue(np.allclose(iq_svd._main_axes[1], np.array([-1, 1]) / np.sqrt(2))) - processed, _ = iq_svd(np.array([[1, 1], [1, -1]])) + processed, _ = processor_wrapper(iq_svd, [[[1, 1], [1, -1]]]) expected = np.array([-1, -1]) / np.sqrt(2) self.assertTrue(np.allclose(processed, expected)) - processed, _ = iq_svd(np.array([[2, 2], [2, -2]])) + processed, _ = processor_wrapper(iq_svd, [[[2, 2], [2, -2]]]) self.assertTrue(np.allclose(processed, expected * 2)) # Check that orthogonal data gives 0. - processed, _ = iq_svd(np.array([[1, -1], [1, 1]])) + processed, _ = processor_wrapper(iq_svd, [[[1, -1], [1, 1]]]) expected = np.array([0, 0]) self.assertTrue(np.allclose(processed, expected)) @@ -166,18 +183,19 @@ def test_svd_error(self): iq_svd._means = [[0.0, 0.0]] # Since the axis is along the real part the imaginary error is irrelevant. - processed, error = iq_svd([[1.0, 0.2]], [[0.2, 0.1]]) + processed, error = processor_wrapper(iq_svd, [[[1.0, 0.2]]], [[[0.2, 0.1]]]) self.assertEqual(processed, np.array([1.0])) self.assertEqual(error, np.array([0.2])) # Since the axis is along the real part the imaginary error is irrelevant. - processed, error = iq_svd([[1.0, 0.2]], [[0.2, 0.3]]) + processed, error = processor_wrapper(iq_svd, [[[1.0, 0.2]]], [[[0.2, 0.3]]]) self.assertEqual(processed, np.array([1.0])) self.assertEqual(error, np.array([0.2])) # Tilt the axis to an angle of 36.9... degrees iq_svd._main_axes = np.array([[0.8, 0.6]]) - processed, error = iq_svd([[1.0, 0.0]], [[0.2, 0.3]]) + processed, error = processor_wrapper(iq_svd, [[[1.0, 0.0]]], [[[0.2, 0.3]]]) + cos_ = np.cos(np.arctan(0.6 / 0.8)) sin_ = np.sin(np.arctan(0.6 / 0.8)) self.assertEqual(processed, np.array([cos_])) @@ -215,14 +233,14 @@ def test_variance_not_zero(self): node = Probability(outcome="1") data = {"1": 1024, "0": 0} - mode, stderr = node(data) - self.assertGreater(stderr, 0.0) - self.assertLessEqual(mode, 1.0) + mean, stderr = processor_wrapper(node, [[data]]) + self.assertGreater(float(stderr[0]), 0.0) + self.assertLessEqual(float(mean[0]), 1.0) data = {"1": 0, "0": 1024} - mode, stderr = node(data) - self.assertGreater(stderr, 0.0) - self.assertGreaterEqual(mode, 0.0) + mean, stderr = processor_wrapper(node, [[data]]) + self.assertGreater(float(stderr[0]), 0.0) + self.assertGreaterEqual(float(mean[0]), 0.0) def test_probability_balanced(self): """Test if p=0.5 is returned when counts are balanced and prior is flat.""" @@ -230,5 +248,5 @@ def test_probability_balanced(self): # balanced counts with a flat prior will yield p = 0.5 data = {"1": 512, "0": 512} - mode, _ = node(data) - self.assertAlmostEqual(mode, 0.5) + mean, _ = processor_wrapper(node, [[data]]) + self.assertAlmostEqual(float(mean[0]), 0.5)