Skip to content

Pipeline data processor#474

Closed
nkanazawa1989 wants to merge 9 commits into
qiskit-community:mainfrom
nkanazawa1989:upgrade/data_processor_pipeline
Closed

Pipeline data processor#474
nkanazawa1989 wants to merge 9 commits into
qiskit-community:mainfrom
nkanazawa1989:upgrade/data_processor_pipeline

Conversation

@nkanazawa1989
Copy link
Copy Markdown
Collaborator

@nkanazawa1989 nkanazawa1989 commented Oct 29, 2021

Summary

Current data processor calls each DataAction node with a full data array with shape of [n_circuits, n_shots, n_slots, n_iq] at maximum. This consumes huge memory when we execute processing with large number of experiment data, or we set huge shot number with the single return mode (note that we can combine multiple execution results, thus this is not limited to max_circuits of backend).

This PR introduces a pipeline of node, to reduce required memory size to process each node, while keeping capability to call all data at once to run specific processing, e.g. min-max normalization, averaging, etc...

Details and comments

_process and __call__ method return a generator, and chain of processor node call will create gen = generator(generator(...)). This generator individually computes entries in the data list, and thus, in principle, it drastically improves memory efficiency.

This is core logic of new processor.

# This is generator
gen_datum = self._data_extraction(data)

history = []
for index, node in enumerate(self._nodes[:call_up_to_node]):
    # Create pipeline of data processing
    gen_datum = node(gen_datum)

    if with_history and (history_nodes is None or index in history_nodes):
        # make sure not to kill pipeline by execution
        gen_datum, gen_datum_copy = itertools.tee(gen_datum)
        out_values, out_errors = execute_pipeline(gen_datum_copy)
        history.append((node.__class__.__name__, out_values, out_errors, index))

# Execute pipeline
out_values, out_errors = execute_pipeline(gen_datum)

Full data array is not created until execute_pipeline function is called.

Minor:
I found an error in implementation of SVD node. Input data to the node is with shape of [n_circuits, n_slots, 2] (i.e. avg mode) but it returns [n_slots, n_circuits, 1]. This has not been affecting actual experiment since we don't have any experiment that simultaneously analyzes IQ data of more than 2 slots (e.g. 2Q RB on IQ data). This shape flip is also fixed in this PR.

knzwnao added 2 commits October 30, 2021 04:40
fix iterator

remove iterator/generator type hints

remove iterator/generator type hints

fix test

fix test

fix bug

wip

wip

wip

wip

wip

wip

wip

elint
*note*
current implementation run SVD on axis of circuits and stack this on axis of slots. this is opposite behavior to expected in this PR.
@nkanazawa1989 nkanazawa1989 requested a review from eggerdj October 29, 2021 19:57
Copy link
Copy Markdown
Contributor

@eggerdj eggerdj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into improvements to the data processor. Looking at the code I'm not yet confident that this PR improves the data-processing and any overhead. Currently, I do not yet understand how this is beneficial (see the specific comments). Here are a couple of requests to help along those lines:

  • Can you single out the SVD bug in a different PR and correct it first (with adequate tests to catch this). Btw: thanks for spotting this and addressing it!
  • restrict the current PR to the generator workflow. Here, we should carefully examine which nodes benefit from a generator and why. I'm a bit suspicious of some places where list is called on the generator. Adding more documentation to how the workflow proposed in this PR is beneficial and why will greatly help future developers and users understand the data processor workflow.

Comment on lines +144 to +148
if with_history and (history_nodes is None or index in history_nodes):
# make sure not to kill pipeline by execution
gen_datum, gen_datum_copy = itertools.tee(gen_datum)
out_values, out_errors = execute_pipeline(gen_datum_copy)
history.append((node.__class__.__name__, out_values, out_errors, index))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with tee. So it is hard to gauge how efficient this is. Looking at the docs https://docs.python.org/3/library/itertools.html#itertools.tee I find that they state

Once tee() has made a split, the original iterable should not be used anywhere else; 
otherwise, the iterable could get advanced without the tee objects being informed.

tee iterators are not threadsafe. 
A RuntimeError may be raised when using simultaneously iterators returned by the same tee() call, 
even if the original iterable is threadsafe.

This itertool may require significant auxiliary storage (depending on how much temporary data needs to be stored). 
In general, if one iterator uses most or all of the data before another iterator starts, 
it is faster to use list() instead of tee().

The last point here has me a bit worried: if we call with history it looks like gen_datum_copy will consume all the data through

out_values, out_errors = execute_pipeline(gen_datum_copy)

which may make it inefficient when calling

out_values, out_errors = execute_pipeline(gen_datum)

Have you tested the performance of this? Do we also have tests to make sure that we get the correct result when requiring the history?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for the comment below?

# make sure not to kill pipeline by execution

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, call with history becomes inefficient as you mentioned. The generator can be called only once, and once it's called, basically it returns empty list next time. The tee function copies generator to keep one un-executed while another one is executed to generate an intermediate state. I think call with history is kind of debug-option and we don't need to pursue performance for this. I cannot find use of this option in current codebase.

Comment thread qiskit_experiments/data_processing/data_processor.py
Comment thread qiskit_experiments/data_processing/data_processor.py Outdated
Comment on lines +236 to +245
def execute_pipeline(gen_datum: Iterator) -> Tuple[np.ndarray, np.ndarray]:
"""Execute processing pipeline and return processed data array.

Args:
gen_datum: A generator to sequentially return datum.

Returns:
A tuple of nominal values and standard errors.
"""
out_values, out_errors = list(zip(*gen_datum))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this getting an Iterator when we are immediately consuming it as a list? Would it not be better to give it a list directly?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this function is to run list(zip(*gen_datum)) (along with some formatting). A generator is a generator object and you cannot access each element until it is executed like this. Once we integrate error into value with uncertainties this can be return np.fromiter(gen_datum, dtype=object).

Comment thread qiskit_experiments/data_processing/nodes.py
yield from gen_datum

def __call__(self, data: Any, error: Optional[Any] = None) -> Tuple[Any, Any]:
def __call__(self, gen_datum: Iterator) -> Generator:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more documentation to explain how this functions. With this change the interface is becoming more abstract and harder to follow. This therefore requires more careful explanation of how the DataAction functions.


@abstractmethod
def train(self, data: List[Any]):
def train(self, full_val_arr: np.ndarray, full_err_arr: Optional[np.ndarray] = None):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I have a node like a discriminator that trains on data with labels? E.g. IQ data? Perhaps the docs require more examples.

self,
input_key: str,
data_actions: List[DataAction] = None,
data_actions: Union[DataAction, TrainableDataAction] = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change since a TrainableDataAction is a DataAction?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found sometime IDE claims .train method is not found in DataAction. Explicitly writing this helps IDE to remove these warnings but this is really minor part. This change can be removed if you want.

self._nodes = data_actions if data_actions else []

def append(self, node: DataAction):
def append(self, node: Union[DataAction, TrainableDataAction]):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.


Args:
datum: A single item of data which corresponds to single-shot IQ data.
gen_datum: A pipeline.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs more doc

nkanazawa1989 and others added 3 commits November 1, 2021 23:36
Co-authored-by: Daniel J. Egger <38065505+eggerdj@users.noreply.github.com>
…a1989/qiskit-experiments into upgrade/data_processor_pipeline
@nkanazawa1989
Copy link
Copy Markdown
Collaborator Author

nkanazawa1989 commented Nov 1, 2021

Thanks Daniel, I was thinking Yield has better performance in spirit of

https://www.guru99.com/python-yield-return-generator.html

* Use yield instead of return when the data size is large
* Yield is the best choice when you need your execution to be faster on large data sets
* Use yield when you want to return a big set of values to the calling function
* Yield is an efficient way of producing data that is big or infinite.

However, in our use case usually averaing node will be called first and data size will be immediately reduced. In this case, looping computation over circuit data will be overhead.

This is the code for benchmark data preparation. This generates data with arbitrary n_slots, n_shots, n_circuits.

import numpy as np

from qiskit_experiments.data_processing.data_processor import DataProcessor
from qiskit_experiments.data_processing.nodes import AverageData, SVD, MinMaxNormalize

def gen_data(n_circs=20, n_slots=50, n_shots=1000):
    rng = np.random.default_rng(12345)

    # generate center position, assume they are aligned along I quadrature
    iq0 = (100 * rng.random(size=[n_slots, 2], dtype=float)).tolist()
    iq1 = list([[-i, q] for i, q in iq0])

    # generate huge single shot memory data
    results = []
    for _ in range(n_circs):
        # |0>
        mem_circs = []
        for _ in range(n_shots):
            mem_slots = []
            for i, q in iq0:
                si, sq = 10 * rng.random(size=2, dtype=float)
                mem_slots.append([i + si, q + sq])
            mem_circs.append(mem_slots)
        results.append({"memory": mem_circs})
        # |1>
        mem_circs = []
        for _ in range(n_shots):
            mem_slots = []
            for i, q in iq1:
                si, sq = 10 * rng.random(size=2, dtype=float)
                mem_slots.append([i + si, q + sq])
            mem_circs.append(mem_slots)
        results.append({"memory": mem_circs})
    
    return results

results = gen_data(n_shots=1000)

benchmark (note that this is really rough benchmark on notebook)

%%time
processor = DataProcessor("memory", [AverageData(axis=1), SVD(), MinMaxNormalize()])
# first two is training data
processor.train(results[:2])
# rest of data is to be analyzed
processed = processor(results[2:])

shots = 1000

full array generation (now):

CPU times: user 2.56 s, sys: 142 ms, total: 2.7 s
Wall time: 2.68 s

generator:

CPU times: user 6.82 s, sys: 180 ms, total: 7 s
Wall time: 6.99 s

If we have multiple nodes before the average, probably generator will have better performance. But seems like this PR is overkill for now.

@nkanazawa1989 nkanazawa1989 added the on hold On hold until something else is done. label Nov 2, 2021
@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Jul 18, 2023

CLA assistant check
All committers have signed the CLA.

@wshanks wshanks closed this Aug 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

on hold On hold until something else is done.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants