Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 24 additions & 31 deletions qiskit_experiments/data_processing/data_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
"""Defines the steps that can be used to analyse data."""

from abc import ABCMeta, abstractmethod
from typing import Any, List, Optional, Tuple
from typing import Generator, Iterator, Optional

import numpy as np


class DataAction(metaclass=ABCMeta):
Expand All @@ -29,50 +31,40 @@ def __init__(self, validate: bool = True):
"""
self._validate = validate

@abstractmethod
def _process(self, datum: Any, error: Optional[Any] = None) -> Tuple[Any, Any]:
"""
Applies the data processing step to the datum.
def _process(self, gen_datum: Iterator) -> Generator:
"""Applies the data processing step to the datum.

Args:
datum: A single item of data which will be processed.
error: An optional error estimation on the datum that can be further propagated.
gen_datum: A generator of unprocessed data. Each entry is a tuple of data and error.

Returns:
processed data: The data that has been processed along with the propagated error.
Yields:
A tuple of processed data and error.
"""
yield from gen_datum

@abstractmethod
def _format_data(self, datum: Any, error: Optional[Any] = None) -> Tuple[Any, Any]:
"""Format and validate the input.
def _format_data(self, gen_datum: Iterator) -> Generator:
"""Validate and format the input.

Check that the given data and error has the correct structure. This method may
additionally change the data type, e.g. converting a list to a numpy array.
Check that the given data and error have the correct structure.

Args:
datum: The data instance to check and format.
error: An optional error estimation on the datum to check and format.

Returns:
datum, error: The formatted datum and its optional error.
gen_datum: A generator of unformatted data. Each entry is a tuple of data and error.

Raises:
DataProcessorError: If either the data or the error do not have the proper format.
Yields:
A tuple of formatted data and error.
"""
yield from gen_datum

def __call__(self, data: Any, error: Optional[Any] = None) -> Tuple[Any, Any]:
def __call__(self, gen_datum: Iterator) -> Generator:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more documentation to explain how this functions. With this change the interface is becoming more abstract and harder to follow. This therefore requires more careful explanation of how the DataAction functions.

"""Call the data action of this node on the data and propagate the error.

Args:
data: The data to process. The action nodes in the data processor will
raise errors if the data does not have the appropriate format.
error: An optional error estimation on the datum that can be further processed.
gen_datum: A generator of raw data. Each entry is a tuple of data and error.

Returns:
processed data: The data processed by self as a tuple of processed datum and
optionally the propagated error estimate.
Yields:
A generator that implements a data processing pipeline.
"""
return self._process(*self._format_data(data, error))
yield from self._process(self._format_data(gen_datum))

def __repr__(self):
"""String representation of the node."""
Expand All @@ -94,11 +86,12 @@ def is_trained(self) -> bool:
"""

@abstractmethod
def train(self, data: List[Any]):
def train(self, full_val_arr: np.ndarray, full_err_arr: Optional[np.ndarray] = None):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I have a node like a discriminator that trains on data with labels? E.g. IQ data? Perhaps the docs require more examples.

"""Train a DataAction.

Certain data processing nodes, such as a SVD, require data to first train.

Args:
data: A list of datum. Each datum is a point used to train the node.
full_val_arr: A list of values. Each datum will be converted to a 2D array.
full_err_arr: A list of errors. Each datm will be converted to a 2D array.
"""
137 changes: 102 additions & 35 deletions qiskit_experiments/data_processing/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@

"""Actions done on the data to bring it in a usable form."""

from typing import Any, Dict, List, Set, Tuple, Union
import itertools
from typing import Any, Dict, List, Set, Tuple, Union, Generator, Iterator

import numpy as np

from qiskit_experiments.data_processing.data_action import DataAction, TrainableDataAction
from qiskit_experiments.data_processing.exceptions import DataProcessorError
Expand All @@ -36,7 +39,7 @@ class DataProcessor:
def __init__(
self,
input_key: str,
data_actions: List[DataAction] = None,
data_actions: Union[DataAction, TrainableDataAction] = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change since a TrainableDataAction is a DataAction?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found sometime IDE claims .train method is not found in DataAction. Explicitly writing this helps IDE to remove these warnings but this is really minor part. This change can be removed if you want.

):
"""Create a chain of data processing actions.

Expand All @@ -45,12 +48,11 @@ def __init__(
will find the data to process.
data_actions: A list of data processing actions to construct this data processor with.
If None is given an empty DataProcessor will be created.
to_array: Boolean indicating if the input data will be converted to a numpy array.
"""
self._input_key = input_key
self._nodes = data_actions if data_actions else []

def append(self, node: DataAction):
def append(self, node: Union[DataAction, TrainableDataAction]):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

"""
Append new data action node to this data processor.

Expand Down Expand Up @@ -125,43 +127,59 @@ def _call_internal(
then all nodes in the data processing chain will be called.

Returns:
datum_ and history if with_history is True or datum_ if with_history is False.
When ``with_history`` is ``False`` it returns a tuple of array-like of data and error.
Otherwise it returns a tuple of above with a list of intermediate data at each step.
"""
if call_up_to_node is None:
call_up_to_node = len(self._nodes)

datum_, error_ = self._data_extraction(data), None
# This is generator
gen_datum = self._data_extraction(data)

history = []
for index, node in enumerate(self._nodes):
for index, node in enumerate(self._nodes[:call_up_to_node]):
# Create pipeline of data processing
gen_datum = node(gen_datum)

if with_history and (history_nodes is None or index in history_nodes):
# make sure not to kill pipeline by execution
gen_datum, gen_datum_copy = itertools.tee(gen_datum)
out_values, out_errors = execute_pipeline(gen_datum_copy)
history.append((node.__class__.__name__, out_values, out_errors, index))
Comment on lines +144 to +148
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with tee. So it is hard to gauge how efficient this is. Looking at the docs https://docs.python.org/3/library/itertools.html#itertools.tee I find that they state

Once tee() has made a split, the original iterable should not be used anywhere else; 
otherwise, the iterable could get advanced without the tee objects being informed.

tee iterators are not threadsafe. 
A RuntimeError may be raised when using simultaneously iterators returned by the same tee() call, 
even if the original iterable is threadsafe.

This itertool may require significant auxiliary storage (depending on how much temporary data needs to be stored). 
In general, if one iterator uses most or all of the data before another iterator starts, 
it is faster to use list() instead of tee().

The last point here has me a bit worried: if we call with history it looks like gen_datum_copy will consume all the data through

out_values, out_errors = execute_pipeline(gen_datum_copy)

which may make it inefficient when calling

out_values, out_errors = execute_pipeline(gen_datum)

Have you tested the performance of this? Do we also have tests to make sure that we get the correct result when requiring the history?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for the comment below?

# make sure not to kill pipeline by execution

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, call with history becomes inefficient as you mentioned. The generator can be called only once, and once it's called, basically it returns empty list next time. The tee function copies generator to keep one un-executed while another one is executed to generate an intermediate state. I think call with history is kind of debug-option and we don't need to pursue performance for this. I cannot find use of this option in current codebase.


# Execute pipeline
out_values, out_errors = execute_pipeline(gen_datum)

if index < call_up_to_node:
datum_, error_ = node(datum_, error_)
# Return only first element if length=1, e.g. [[0, 1]] -> [0, 1]
if out_values.shape[0] == 1:
out_values = out_values[0]

if with_history and (
history_nodes is None or (history_nodes and index in history_nodes)
):
history.append((node.__class__.__name__, datum_, error_, index))
# Return only first element if length=1, e.g. [[0, 1]] -> [0, 1]
if out_errors.shape[0] == 1:
out_errors = out_errors[0]

# Return None if error is not computed
if np.isnan(out_errors).all():
out_errors = None

if with_history:
return datum_, error_, history
return out_values, out_errors, history
else:
return datum_, error_
return out_values, out_errors

def train(self, data: List[Dict[str, Any]]):
"""Train the nodes of the data processor.

Args:
data: The data to use to train the data processor.
"""

for index, node in enumerate(self._nodes):
if isinstance(node, TrainableDataAction):
if not node.is_trained:
# Process the data up to the untrained node.
node.train(self._call_internal(data, call_up_to_node=index)[0])
node.train(*self._call_internal(data, call_up_to_node=index))

def _data_extraction(self, data: Union[Dict, List[Dict]]) -> List:
def _data_extraction(self, data: Union[Dict, List[Dict]]) -> Generator:
"""Extracts the data on which to run the nodes.

If the datum is a list of dicts then the data under self._input_key is extracted
Expand All @@ -172,35 +190,84 @@ def _data_extraction(self, data: Union[Dict, List[Dict]]) -> List:
Args:
data: A list of such dicts where the data is contained under the key self._input_key.

Returns:
The data formatted in such a way that it is ready to be processed by the nodes.
Yields:
A tuple of numpy array object representing a data and error.

Raises:
DataProcessorError:
- If the input datum is not a list or a dict.
- If the data processor received a single datum but requires all the data to
process it properly.
- If the input key of the data processor is not contained in the data.
"""
if isinstance(data, dict):
data = [data]

try:
data_ = [_datum[self._input_key] for _datum in iter(data)]
except KeyError as error:
raise DataProcessorError(
f"The input key {self._input_key} was not found in the input datum."
) from error
except TypeError as error:
raise DataProcessorError(
f"{self.__class__.__name__} only extracts data from "
f"lists or dicts, received {type(data)}."
) from error

return data_
for datum in data:
try:
target = datum[self._input_key]

# returns data and initial error
if isinstance(target, dict):
# likely level2 data, forcibly convert into array
Comment thread
nkanazawa1989 marked this conversation as resolved.
yield np.asarray([target], dtype=object), np.asarray([np.nan], dtype=float)
else:
try:
# level1 or below
nominal_arr = np.asarray(target, dtype=float)
stdev_arr = np.full_like(target, np.nan, dtype=float)
except TypeError:
# level2 memory ["00", "11", "01", ...]
nominal_arr = np.asarray(target, dtype=object)
stdev_arr = np.asarray([np.nan], dtype=float)
yield nominal_arr, stdev_arr

except KeyError as error:
raise DataProcessorError(
f"The input key {self._input_key} was not found in the input datum."
) from error
except TypeError as error:
raise DataProcessorError(
f"{self.__class__.__name__} only extracts data from "
f"lists or dicts, received {type(data)}."
) from error

def __repr__(self):
"""String representation of data processors."""
names = ", ".join(node.__class__.__name__ for node in self._nodes)

return f"{self.__class__.__name__}(input_key={self._input_key}, nodes=[{names}])"


def execute_pipeline(gen_datum: Iterator) -> Tuple[np.ndarray, np.ndarray]:
"""Execute processing pipeline and return processed data array.

Args:
gen_datum: A generator to sequentially return datum.

Returns:
A tuple of nominal values and standard errors.
"""
out_values, out_errors = list(zip(*gen_datum))
Comment on lines +240 to +249
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this getting an Iterator when we are immediately consuming it as a list? Would it not be better to give it a list directly?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this function is to run list(zip(*gen_datum)) (along with some formatting). A generator is a generator object and you cannot access each element until it is executed like this. Once we integrate error into value with uncertainties this can be return np.fromiter(gen_datum, dtype=object).


try:
# try to convert into float object for performance
out_values = np.asarray(out_values, dtype=float)
except TypeError:
# if not convert into arbitrary array
out_values = np.asarray(out_values, dtype=object)

# convert into 1D array e.g. [[0], [1], ...] -> [0, 1, ...]
if len(out_values.shape) == 2 and out_values.shape[1] == 1:
out_values = out_values[:, 0]

try:
# try to convert into float object for performance
out_errors = np.asarray(out_errors, dtype=float)
except TypeError:
# if not convert into arbitrary array
out_errors = np.asarray(out_errors, dtype=object)

# convert into 1D array e.g. [[0], [1], ...] -> [0, 1, ...]
if len(out_errors.shape) == 2 and out_errors.shape[1] == 1:
out_errors = out_errors[:, 0]

return out_values, out_errors
Loading