diff --git a/docs/apidocs/database_service.rst b/docs/apidocs/database_service.rst new file mode 100644 index 0000000000..b7be621bef --- /dev/null +++ b/docs/apidocs/database_service.rst @@ -0,0 +1,6 @@ +.. _qiskit-experiments: + +.. automodule:: qiskit_experiments.database_service + :no-members: + :no-inherited-members: + :no-special-members: \ No newline at end of file diff --git a/docs/apidocs/index.rst b/docs/apidocs/index.rst index c6f43eb75a..732e3874ee 100644 --- a/docs/apidocs/index.rst +++ b/docs/apidocs/index.rst @@ -14,4 +14,5 @@ Qiskit Experiments API Reference tomography analysis data_processing + database_service \ No newline at end of file diff --git a/qiskit_experiments/database_service/__init__.py b/qiskit_experiments/database_service/__init__.py new file mode 100644 index 0000000000..4ee59e18b0 --- /dev/null +++ b/qiskit_experiments/database_service/__init__.py @@ -0,0 +1,95 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +""" +============================================================= +Database Service (:mod:`qiskit_experiments.database_service`) +============================================================= + +.. currentmodule:: qiskit_experiments.database_service + +This subpackage contains classes used to define the data structure of +an experiment, including its data, metadata, analysis results, and figures, as +well as the interface to an experiment database service. An experiment database +service allows one to store, retrieve, and query experiment related data. + +:class:`DbExperimentDataV1` is the main class that defines the structure of +experiment data, which consists of the following: + + * Results from circuit execution, which is called ``data`` in this class. + The :meth:`DbExperimentDataV1.add_data` + method allows you to add circuit jobs and job results. If jobs are added, + the method asynchronously waits for them to finish and extracts job results. + :meth:`DbExperimentDataV1.data` can then be used to retrieve this data. + Note that this data is not saved in the database. It is included in this + class only for convenience. + + * Experiment metadata. This is a freeform keyword-value dictionary. You can + use this to save extra information, such as the physical qubits the experiment + operated on, in the database. :meth:`DbExperimentDataV1.set_metadata` and + :meth:`DbExperimentDataV1.metadata` are methods to set and retrieve metadata, + respectively. + + * Analysis results. It is likely that some analysis is to be done on the + experiment data once the circuit jobs finish, and the result of this + analysis can be stored in the database. Similar to ``DbExperimentDataV1``, + :class:`DbAnalysisResultV1` defines the data structure of an analysis + result and provides methods to interface with the database. Being a separate + class, :class:`DbAnalysisResultV1` allows you to modify an analysis result + without modifying the experiment data. + + * Figures. Some analysis functions also generate figures, which can also be + saved in the database. + +:class:`DatabaseServiceV1` provides low-level abstract interface for accessing the +database, such as :meth:`DatabaseServiceV1.create_experiment` for creating a +new experiment entry and :meth:`DatabaseServiceV1.update_experiment` for +updating an existing entry. :class:`DbExperimentDataV1` has methods that wrap +around some of these low-level database methods. For example, +:meth:`DbExperimentDataV1.save` calls :meth:`DatabaseServiceV1.create_experiment` +under the cover to save experiment related data. The low-level methods are only +expected to be used when you want to interact with the database directly - for +example, to retrieve a saved analysis result. + +Currently only IBM Quantum provides this database service. See +`qiskit-ibmq-provider `_ +for more details. + +Classes +======= + +.. autosummary:: + :toctree: ../stubs/ + + DbExperimentData + DbExperimentDataV1 + DbAnalysisResult + DbAnalysisResultV1 + DatabaseService + DatabaseServiceV1 + + +Exceptions +========== + +.. autosummary:: + :toctree: ../stubs/ + + DbExperimentDataError + DbExperimentEntryExists + DbExperimentEntryNotFound +""" + +from .db_experiment_data import DbExperimentData, DbExperimentDataV1 +from .db_analysis_result import DbAnalysisResult, DbAnalysisResultV1 +from .database_service import DatabaseService, DatabaseServiceV1 +from .exceptions import DbExperimentDataError, DbExperimentEntryExists, DbExperimentEntryNotFound diff --git a/qiskit_experiments/database_service/database_service.py b/qiskit_experiments/database_service/database_service.py new file mode 100644 index 0000000000..421b80a914 --- /dev/null +++ b/qiskit_experiments/database_service/database_service.py @@ -0,0 +1,417 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Experiment database service abstract interface.""" + +from abc import ABC, abstractmethod +from typing import Optional, Dict, List, Any, Union, Tuple + +from qiskit.providers import Options + +from .device_component import DeviceComponent + + +class DatabaseService: + """Base common type for all versioned DatabaseService abstract classes. + + Note this class should not be inherited from directly, it is intended + to be used for type checking. When implementing a subclass you should use + the versioned abstract classes as the parent class and not this class + directly. + """ + + version = 0 + + +class DatabaseServiceV1(DatabaseService, ABC): + """Interface for providing experiment database service. + + This class defines the interface ``qiskit_experiments`` expects from an + experiment database service. + + An experiment database service allows you to store experiment data and metadata + in a database. An experiment can have one or more jobs, analysis results, + and figures. + + Each implementation of this service may use different data structure and + should issue a warning on unsupported keywords. + """ + + version = 1 + + def __init__(self): + """Initialize an DatabaseService instance.""" + self._options = self._default_options() + + @classmethod + @abstractmethod + def _default_options(cls) -> Options: + """Return the default options + + Returns: + Default options. + """ + pass + + @abstractmethod + def create_experiment( + self, + experiment_type: str, + backend_name: str, + metadata: Optional[Dict] = None, + experiment_id: Optional[str] = None, + job_ids: Optional[List[str]] = None, + tags: Optional[List[str]] = None, + notes: Optional[str] = None, + **kwargs: Any, + ) -> str: + """Create a new experiment in the database. + + Args: + experiment_type: Experiment type. + backend_name: Name of the backend the experiment ran on. + metadata: Experiment metadata. + experiment_id: Experiment ID. It must be in the ``uuid4`` format. + One will be generated if not supplied. + job_ids: IDs of experiment jobs. + tags: Tags to be associated with the experiment. + notes: Freeform notes about the experiment. + kwargs: Additional keywords supported by the service provider. + + Returns: + Experiment ID. + + Raises: + ExperimentEntryExists: If the experiment already exits. + """ + pass + + @abstractmethod + def update_experiment( + self, + experiment_id: str, + metadata: Optional[Dict] = None, + job_ids: Optional[List[str]] = None, + notes: Optional[str] = None, + tags: Optional[List[str]] = None, + **kwargs: Any, + ) -> None: + """Update an existing experiment. + + Args: + experiment_id: Experiment ID. + metadata: Experiment metadata. + job_ids: IDs of experiment jobs. + notes: Freeform notes about the experiment. + tags: Tags to be associated with the experiment. + kwargs: Additional keywords supported by the service provider. + + Raises: + ExperimentEntryNotFound: If the experiment does not exist. + """ + pass + + @abstractmethod + def experiment(self, experiment_id: str) -> Dict: + """Retrieve a previously stored experiment. + + Args: + experiment_id: Experiment ID. + + Returns: + A dictionary containing the retrieved experiment data. + + Raises: + ExperimentEntryNotFound: If the experiment does not exist. + """ + pass + + @abstractmethod + def experiments( + self, + limit: Optional[int] = 10, + device_components: Optional[Union[str, DeviceComponent]] = None, + experiment_type: Optional[str] = None, + backend_name: Optional[str] = None, + tags: Optional[List[str]] = None, + tags_operator: Optional[str] = "OR", + **filters: Any, + ) -> List[Dict]: + """Retrieve all experiment data, with optional filtering. + + Args: + limit: Number of experiment data entries to retrieve. ``None`` means no limit. + device_components: Filter by device components. An experiment must have analysis + results with device components matching the given list exactly to be included. + experiment_type: Experiment type used for filtering. + backend_name: Backend name used for filtering. + tags: Filter by tags assigned to experiments. This can be used + with `tags_operator` for granular filtering. + tags_operator: Logical operator to use when filtering by tags. Valid + values are "AND" and "OR": + + * If "AND" is specified, then an experiment must have all of the tags + specified in `tags` to be included. + * If "OR" is specified, then an experiment only needs to have any + of the tags specified in `tags` to be included. + + **filters: Additional filtering keywords supported by the service provider. + + Returns: + A list of experiments. + """ + pass + + @abstractmethod + def delete_experiment(self, experiment_id: str) -> None: + """Delete an experiment. + + Args: + experiment_id: Experiment ID. + """ + pass + + @abstractmethod + def create_analysis_result( + self, + experiment_id: str, + data: Dict, + result_type: str, + device_components: Optional[Union[str, DeviceComponent]] = None, + tags: Optional[List[str]] = None, + quality: Optional[str] = None, + verified: bool = False, + result_id: Optional[str] = None, + **kwargs: Any, + ) -> str: + """Create a new analysis result in the database. + + Args: + experiment_id: ID of the experiment this result is for. + data: Result data to be stored. + result_type: Analysis result type. + device_components: Target device components, such as qubits. + tags: Tags to be associated with the analysis result. + quality: Quality of this analysis. + verified: Whether the result quality has been verified. + result_id: Analysis result ID. It must be in the ``uuid4`` format. + One will be generated if not supplied. + kwargs: Additional keywords supported by the service provider. + + Returns: + Analysis result ID. + + Raises: + ExperimentEntryExists: If the analysis result already exits. + """ + pass + + @abstractmethod + def update_analysis_result( + self, + result_id: str, + data: Optional[Dict] = None, + tags: Optional[List[str]] = None, + quality: Optional[str] = None, + verified: bool = None, + **kwargs: Any, + ) -> None: + """Update an existing analysis result. + + Args: + result_id: Analysis result ID. + data: Result data to be stored. + quality: Quality of this analysis. + verified: Whether the result quality has been verified. + tags: Tags to be associated with the analysis result. + kwargs: Additional keywords supported by the service provider. + + Raises: + ExperimentEntryNotFound: If the analysis result does not exist. + """ + pass + + @abstractmethod + def analysis_result(self, result_id: str) -> Dict: + """Retrieve a previously stored experiment. + + Args: + result_id: Analysis result ID. + + Returns: + Retrieved analysis result. + + Raises: + ExperimentEntryNotFound: If the analysis result does not exist. + """ + pass + + @abstractmethod + def analysis_results( + self, + limit: Optional[int] = 10, + device_components: Optional[Union[str, DeviceComponent]] = None, + experiment_id: Optional[str] = None, + result_type: Optional[str] = None, + backend_name: Optional[str] = None, + quality: Optional[str] = None, + verified: Optional[bool] = None, + tags: Optional[List[str]] = None, + tags_operator: Optional[str] = "OR", + **filters: Any, + ) -> List[Dict]: + """Retrieve all analysis results, with optional filtering. + + Args: + limit: Number of analysis results to retrieve. ``None`` means no limit. + device_components: Target device components, such as qubits. + experiment_id: Experiment ID used for filtering. + result_type: Analysis result type used for filtering. + backend_name: Backend name used for filtering. If specified, analysis + results associated with experiments on that backend are returned. + quality: Quality value used for filtering. + verified: Whether the result quality has been verified. + tags: Filter by tags assigned to analysis results. This can be used + with `tags_operator` for granular filtering. + tags_operator: Logical operator to use when filtering by tags. Valid + values are "AND" and "OR": + + * If "AND" is specified, then an analysis result must have all of the tags + specified in `tags` to be included. + * If "OR" is specified, then an analysis result only needs to have any + of the tags specified in `tags` to be included. + + **filters: Additional filtering keywords supported by the service provider. + + Returns: + A list of analysis results. + """ + pass + + @abstractmethod + def delete_analysis_result(self, result_id: str) -> None: + """Delete an analysis result. + + Args: + result_id: Analysis result ID. + """ + pass + + @abstractmethod + def create_figure( + self, experiment_id: str, figure: Union[str, bytes], figure_name: Optional[str] + ) -> Tuple[str, int]: + """Store a new figure in the database. + + Args: + experiment_id: ID of the experiment this figure is for. + figure: Path of the figure file or figure data to store. + figure_name: Name of the figure. If ``None``, the figure file name, if + given, or a generated name is used. + + Returns: + A tuple of the name and size of the saved figure. + + Raises: + ExperimentEntryExists: If the figure already exits. + """ + pass + + @abstractmethod + def update_figure( + self, experiment_id: str, figure: Union[str, bytes], figure_name: str + ) -> Tuple[str, int]: + """Update an existing figure. + + Args: + experiment_id: Experiment ID. + figure: Path of the figure file or figure data to store. + figure_name: Name of the figure. + + Returns: + A tuple of the name and size of the saved figure. + + Raises: + ExperimentEntryNotFound: If the figure does not exist. + """ + pass + + @abstractmethod + def figure( + self, experiment_id: str, figure_name: str, file_name: Optional[str] = None + ) -> Union[int, bytes]: + """Retrieve an existing figure. + + Args: + experiment_id: Experiment ID. + figure_name: Name of the figure. + file_name: Name of the local file to save the figure to. If ``None``, + the content of the figure is returned instead. + + Returns: + The size of the figure if `file_name` is specified. Otherwise the + content of the figure in bytes. + + Raises: + ExperimentEntryNotFound: If the figure does not exist. + """ + pass + + @abstractmethod + def delete_figure( + self, + experiment_id: str, + figure_name: str, + ) -> None: + """Delete an existing figure. + + Args: + experiment_id: Experiment ID. + figure_name: Name of the figure. + """ + pass + + def set_options(self, **fields): + """Set the options fields for the service. + + Args: + fields: The fields to update the options + + Raises: + AttributeError: If the field passed in is not part of the + options + """ + for field in fields: + if field not in self._options: + raise AttributeError("Options field %s is not valid for this " "service." % field) + self._options.update_options(**fields) + + def option(self, field: str) -> Any: + """Get the value of the specified option. + + Args: + field: Option field to retrieve. + + Returns: + Option value. + + Raises: + AttributeError: If the input field is not valid for the service. + """ + if field not in self._options: + raise AttributeError(f"Options field {field} is not valid for this service.") + return self._options[field] + + @property + def options(self) -> Options: + """Return the options for the service.""" + return self._options diff --git a/qiskit_experiments/database_service/db_analysis_result.py b/qiskit_experiments/database_service/db_analysis_result.py new file mode 100644 index 0000000000..d6b7ce5a96 --- /dev/null +++ b/qiskit_experiments/database_service/db_analysis_result.py @@ -0,0 +1,378 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Analysis result abstract interface.""" + +import logging +from typing import Optional, List, Union, Dict, Callable, Any +import uuid +import json +import copy +from functools import wraps + +from .database_service import DatabaseServiceV1 +from .json import NumpyEncoder, NumpyDecoder +from .utils import save_data, qiskit_version +from .exceptions import DbExperimentDataError +from .device_component import DeviceComponent, to_component + +LOG = logging.getLogger(__name__) + + +def auto_save(func: Callable): + """Decorate the input function to auto save data.""" + + @wraps(func) + def _wrapped(self, *args, **kwargs): + return_val = func(self, *args, **kwargs) + if self.auto_save: + self.save() + return return_val + + return _wrapped + + +class DbAnalysisResult: + """Base common type for all versioned DbAnalysisResult abstract classes. + + Note this class should not be inherited from directly, it is intended + to be used for type checking. When implementing a custom DbAnalysisResult + you should use the versioned classes as the parent class and not this class + directly. + """ + + version = 0 + + +class DbAnalysisResultV1(DbAnalysisResult): + """Class representing an analysis result for an experiment. + + Analysis results can also be stored in a database. + """ + + version = 1 + _data_version = 1 + + _json_encoder = NumpyEncoder + _json_decoder = NumpyDecoder + + _extra_data = {} + + def __init__( + self, + result_data: Dict, + result_type: str, + device_components: List[Union[DeviceComponent, str]], + experiment_id: str, + result_id: Optional[str] = None, + quality: Optional[str] = None, + verified: bool = False, + tags: Optional[List[str]] = None, + service: Optional[DatabaseServiceV1] = None, + **kwargs, + ): + """AnalysisResult constructor. + + Args: + result_data: Analysis result data. + result_type: Analysis result type. + device_components: Target device components this analysis is for. + experiment_id: ID of the experiment. + result_id: Result ID. If ``None``, one is generated. + quality: Quality of the analysis. Refer to the experiment service + provider for valid values. + verified: Whether the result quality has been verified. + tags: Tags for this analysis result. + service: Experiment service to be used to store result in database. + **kwargs: Additional analysis result attributes. + """ + result_data = result_data or {} + self._result_data = copy.deepcopy(result_data) + self._source = self._result_data.pop( + "_source", + { + "class": f"{self.__class__.__module__}.{self.__class__.__name__}", + "data_version": self._data_version, + "qiskit_version": qiskit_version(), + }, + ) + + # Data to be stored in DB. + self._experiment_id = experiment_id + self._id = result_id or str(uuid.uuid4()) + self._type = result_type + self._device_components = [] + for comp in device_components: + if isinstance(comp, str): + comp = to_component(comp) + self._device_components.append(comp) + + self._quality = quality + self._quality_verified = verified + self._tags = tags or [] + + # Other attributes. + self._service = service + self._created_in_db = False + self.auto_save = False + if self._service: + try: + self.auto_save = self._service.option("auto_save") + except AttributeError: + pass + self._extra_data = kwargs + + def serialize_data(self) -> str: + """Serialize result data into JSON string. + + Returns: + Serialized JSON string. + """ + return json.dumps(self._result_data, cls=self._json_encoder) + + @classmethod + def deserialize_data(cls, data: str) -> Dict: + """Deserialize experiment from JSON string. + + Args: + data: Data to be deserialized. + + Returns: + Deserialized data. + """ + return json.loads(data, cls=cls._json_decoder) + + @classmethod + def from_data( + cls, + result_data: Dict, + result_type: str, + device_components: List[Union[DeviceComponent, str]], + experiment_id: str, + **kwargs, + ) -> "DbAnalysisResultV1": + """Reconstruct the analysis result from input data. + + Args: + result_data: Analysis result data. + result_type: Analysis result type. + device_components: Target device components this analysis is for. + experiment_id: ID of the experiment. + **kwargs: Additional analysis result attributes. + + Returns: + Reconstructed analysis result. + """ + if result_data: + result_data = cls.deserialize_data(json.dumps(result_data)) + return cls( + result_data=result_data, + result_type=result_type, + device_components=device_components, + experiment_id=experiment_id, + **kwargs, + ) + + def save(self) -> None: + """Save this analysis result in the database. + + Raises: + DbExperimentDataError: If the analysis result contains invalid data. + """ + if not self._service: + LOG.warning( + "Analysis result cannot be saved because no experiment service is available." + ) + return + + _result_data = json.loads(self.serialize_data()) + _result_data["_source"] = self._source + + new_data = { + "experiment_id": self._experiment_id, + "result_type": self.result_type, + "device_components": self.device_components, + } + update_data = { + "result_id": self.result_id, + "data": _result_data, + "tags": self.tags(), + "quality": self.quality, + "verified": self.verified, + } + + self._created_in_db, _ = save_data( + is_new=(not self._created_in_db), + new_func=self._service.create_analysis_result, + update_func=self._service.update_analysis_result, + new_data=new_data, + update_data=update_data, + ) + + def data(self) -> Dict: + """Return analysis result data. + + Returns: + Analysis result data. + """ + return self._result_data + + @auto_save + def set_data(self, new_data: Dict) -> None: + """Set result data. + + Args: + new_data: New analysis result data. + """ + self._result_data = new_data + + def tags(self): + """Return tags associated with this result.""" + return self._tags + + @auto_save + def set_tags(self, new_tags: List[str]) -> None: + """Set tags for this result. + + Args: + new_tags: New tags for the result. + """ + self._tags = new_tags + + @property + def result_id(self) -> str: + """Return analysis result ID. + + Returns: + ID for this analysis result. + """ + return self._id + + @property + def result_type(self) -> str: + """Return analysis result type. + + Returns: + Analysis result type. + """ + return self._type + + @property + def source(self) -> Dict: + """Return the class name and version.""" + return self._source + + @property + def quality(self) -> str: + """Return the quality of this analysis. + + Returns: + Quality of this analysis. + """ + return self._quality + + @quality.setter + def quality(self, new_quality: str) -> None: + """Set the quality of this analysis. + + Args: + new_quality: New analysis quality. + """ + self._quality = new_quality + if self.auto_save: + self.save() + + @property + def verified(self) -> bool: + """Return the verified flag. + + The ``verified`` flag is intended to indicate whether the quality + value has been verified by a human. + + Returns: + Whether the quality has been verified. + """ + return self._quality_verified + + @verified.setter + def verified(self, verified: bool) -> None: + """Set the verified flag. + + Args: + verified: Whether the quality is verified. + """ + self._quality_verified = verified + if self.auto_save: + self.save() + + @property + def experiment_id(self) -> str: + """Return the ID of the experiment associated with this analysis result. + + Returns: + ID of experiment associated with this analysis result. + """ + return self._experiment_id + + @property + def service(self) -> Optional[DatabaseServiceV1]: + """Return the database service. + + Returns: + Service that can be used to store this analysis result in a database. + ``None`` if not available. + """ + return self._service + + @service.setter + def service(self, service: DatabaseServiceV1) -> None: + """Set the service to be used for storing result data in a database. + + Args: + service: Service to be used. + + Raises: + DbExperimentDataError: If an experiment service is already being used. + """ + if self._service: + raise DbExperimentDataError("An experiment service is already being used.") + self._service = service + + @property + def device_components(self) -> List[DeviceComponent]: + """Return target device components for this analysis result. + + Returns: + Target device components. + """ + return self._device_components + + def __getattr__(self, name: str) -> Any: + try: + return self._extra_data[name] + except KeyError: + # pylint: disable=raise-missing-from + raise AttributeError("Attribute %s is not defined" % name) + + def __str__(self): + ret = f"\nAnalysis Result: {self.result_type}" + ret += f"\nAnalysis Result ID: {self.result_id}" + ret += f"\nExperiment ID: {self.experiment_id}" + ret += f"\nDevice Components: {self.device_components}" + ret += f"\nQuality: {self.quality}" + ret += f"\nVerified: {self.verified}" + if self.tags(): + ret += f"\nTags: {self.tags()}" + ret += "\nResult Data:" + for key, val in self.data().items(): + ret += f"\n - {key}: {val}" + return ret diff --git a/qiskit_experiments/database_service/db_experiment_data.py b/qiskit_experiments/database_service/db_experiment_data.py new file mode 100644 index 0000000000..ef5d1b26c2 --- /dev/null +++ b/qiskit_experiments/database_service/db_experiment_data.py @@ -0,0 +1,1048 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Stored data class.""" + +import logging +import uuid +import json +from typing import Optional, List, Any, Union, Callable, Dict +import copy +from concurrent import futures +from functools import wraps +import traceback +import contextlib +from collections import deque +from datetime import datetime + +from qiskit.providers import Job, BaseJob, Backend, BaseBackend, Provider +from qiskit.result import Result +from qiskit.providers.jobstatus import JobStatus, JOB_FINAL_STATES +from qiskit.providers.exceptions import JobError +from qiskit.visualization import HAS_MATPLOTLIB + +from .database_service import DatabaseServiceV1 +from .exceptions import DbExperimentDataError, DbExperimentEntryNotFound, DbExperimentEntryExists +from .db_analysis_result import DbAnalysisResultV1 as DbAnalysisResult +from .json import NumpyEncoder, NumpyDecoder +from .utils import ( + save_data, + qiskit_version, + plot_to_svg_bytes, + ThreadSafeOrderedDict, + ThreadSafeList, +) + +LOG = logging.getLogger(__name__) + + +def auto_save(func: Callable): + """Decorate the input function to auto save data.""" + + @wraps(func) + def _wrapped(self, *args, **kwargs): + return_val = func(self, *args, **kwargs) + if self.auto_save: + self.save() + return return_val + + return _wrapped + + +@contextlib.contextmanager +def service_exception_to_warning(): + """Convert an exception raised by experiment service to a warning.""" + try: + yield + except Exception: # pylint: disable=broad-except + LOG.warning("Experiment service operation failed: %s", traceback.format_exc()) + + +class DbExperimentData: + """Base common type for all versioned DbExperimentData classes. + + Note this class should not be inherited from directly, it is intended + to be used for type checking. When implementing a custom DbExperimentData class, + you should use the versioned classes as the parent class and not this class + directly. + """ + + version = 0 + + +class DbExperimentDataV1(DbExperimentData): + """Class to define and handle experiment data stored in a database. + + This class serves as a container for experiment related data to be stored + in a database, which may include experiment metadata, analysis results, + and figures. It also provides methods used to interact with the database, + such as storing into and retrieving from the database. + """ + + version = 1 + _metadata_version = 1 + _executor = futures.ThreadPoolExecutor() + """Threads used for asynchronous processing.""" + + _json_encoder = NumpyEncoder + _json_decoder = NumpyDecoder + + def __init__( + self, + experiment_type: str, + backend: Optional[Union[Backend, BaseBackend]] = None, + experiment_id: Optional[str] = None, + tags: Optional[List[str]] = None, + job_ids: Optional[List[str]] = None, + share_level: Optional[str] = None, + metadata: Optional[Dict] = None, + figure_names: Optional[List[str]] = None, + notes: Optional[str] = None, + **kwargs, + ): + """Initializes the DbExperimentData instance. + + Args: + experiment_type: Experiment type. + backend: Backend the experiment runs on. + experiment_id: Experiment ID. One will be generated if not supplied. + tags: Tags to be associated with the experiment. + job_ids: IDs of jobs submitted for the experiment. + share_level: Whether this experiment can be shared with others. This + is applicable only if the database service supports sharing. See + the specific service provider's documentation on valid values. + metadata: Additional experiment metadata. + figure_names: Name of figures associated with this experiment. + notes: Freeform notes about the experiment. + **kwargs: Additional experiment attributes. + """ + metadata = metadata or {} + self._metadata = copy.deepcopy(metadata) + self._source = self._metadata.pop( + "_source", + { + "class": f"{self.__class__.__module__}.{self.__class__.__name__}", + "metadata_version": self._metadata_version, + "qiskit_version": qiskit_version(), + }, + ) + + self._service = None + self._backend = backend + self.auto_save = False + self._set_service_from_backend(backend) + + self._id = experiment_id or str(uuid.uuid4()) + self._type = experiment_type + self._tags = tags or [] + self._share_level = share_level + self._notes = notes or "" + + self._jobs = ThreadSafeOrderedDict(job_ids or []) + self._job_futures = ThreadSafeList() + self._errors = [] + + self._data = ThreadSafeList() + self._figures = ThreadSafeOrderedDict(figure_names or []) + self._analysis_results = ThreadSafeOrderedDict() + + self._deleted_figures = deque() + self._deleted_analysis_results = deque() + + self._created_in_db = False + self._extra_data = kwargs + + def _set_service_from_backend(self, backend: Union[Backend, BaseBackend]) -> None: + """Set the service to be used from the input backend. + + Args: + backend: Backend whose provider may offer experiment service. + """ + with contextlib.suppress(Exception): + self._service = backend.provider().service("experiment") + self.auto_save = self._service.options.get("auto_save", False) + + def add_data( + self, + data: Union[Result, List[Result], Job, List[Job], Dict, List[Dict]], + post_processing_callback: Optional[Callable] = None, + timeout: Optional[float] = None, + **kwargs: Any, + ) -> None: + """Add experiment data. + + Note: + This method is not thread safe and should not be called by the + `post_processing_callback` function. + + Note: + If `data` is a ``Job``, this method waits for the job to finish + and calls the `post_processing_callback` function asynchronously. + + Args: + data: Experiment data to add. + Several types are accepted for convenience: + + * Result: Add data from this ``Result`` object. + * List[Result]: Add data from the ``Result`` objects. + * Job: Add data from the job result. + * List[Job]: Add data from the job results. + * Dict: Add this data. + * List[Dict]: Add this list of data. + + post_processing_callback: Callback function invoked when data is + added. If `data` is a ``Job``, the callback is only invoked when + the job finishes successfully. + The following positional arguments are provided to the callback function: + + * This ``DbExperimentData`` object. + * Additional keyword arguments passed to this method. + + timeout: Timeout waiting for job to finish, if `data` is a ``Job``. + + **kwargs: Keyword arguments to be passed to the callback function. + + Raises: + TypeError: If the input data type is invalid. + """ + with self._job_futures.lock: + if any(not fut.done() for _, fut in self._job_futures): + LOG.warning( + "Not all post-processing has finished. Adding new data " + "may create unexpected analysis results." + ) + + if isinstance(data, (Job, BaseJob)): + if self.backend and self.backend.name() != data.backend().name(): + LOG.warning( + "Adding a job from a backend (%s) that is different " + "than the current backend (%s). " + "The new backend will be used, but " + "service is not changed if one already exists.", + data.backend(), + self.backend, + ) + self._backend = data.backend() + if not self._service: + self._set_service_from_backend(self._backend) + + self._jobs[data.job_id()] = data + self._job_futures.append( + ( + data, + self._executor.submit( + self._wait_for_job, data, post_processing_callback, timeout, **kwargs + ), + ) + ) + if self.auto_save: + self.save() + return + + if isinstance(data, dict): + self._add_single_data(data) + elif isinstance(data, Result): + self._add_result_data(data) + elif isinstance(data, list): + for dat in data: + self.add_data(dat) + else: + raise TypeError(f"Invalid data type {type(data)}.") + + if post_processing_callback is not None: + post_processing_callback(self, **kwargs) + + def _wait_for_job( + self, + job: Union[Job, BaseJob], + job_done_callback: Optional[Callable] = None, + timeout: Optional[float] = None, + **kwargs: Any, + ) -> None: + """Wait for a job to finish. + + Args: + job: Job to wait for. + job_done_callback: Callback function to invoke when job finishes. + timeout: Timeout waiting for job to finish. + **kwargs: Keyword arguments to be passed to the callback function. + + Raises: + Exception: If post processing failed. + """ + LOG.debug("Waiting for job %s to finish.", job.job_id()) + try: + try: + job_result = job.result(timeout=timeout) + except TypeError: # Not all jobs take timeout. + job_result = job.result() + with self._data.lock: + # Hold the lock so we add the block of results together. + self._add_result_data(job_result) + except JobError as err: + LOG.warning("Job %s failed: %s", job.job_id(), str(err)) + return + + try: + if job_done_callback: + job_done_callback(self, **kwargs) + except Exception: # pylint: disable=broad-except + LOG.warning("Post processing function failed:\n%s", traceback.format_exc()) + raise + + def _add_result_data(self, result: Result) -> None: + """Add data from a Result object + + Args: + result: Result object containing data to be added. + """ + if result.job_id not in self._jobs: + self._jobs[result.job_id] = None + for i in range(len(result.results)): + data = result.data(i) + data["job_id"] = result.job_id + if "counts" in data: + # Format to Counts object rather than hex dict + data["counts"] = result.get_counts(i) + if "memory" in data: + data["memory"] = result.get_memory(i) + expr_result = result.results[i] + if hasattr(expr_result, "header") and hasattr(expr_result.header, "metadata"): + data["metadata"] = expr_result.header.metadata + data["shots"] = expr_result.shots + data["meas_level"] = expr_result.meas_level + if hasattr(expr_result, "meas_return"): + data["meas_return"] = expr_result.meas_return + self._add_single_data(data) + + def _add_single_data(self, data: Dict[str, any]) -> None: + """Add a single data dictionary to the experiment. + + Args: + data: Data to be added. + """ + self._data.append(data) + + def data(self, index: Optional[Union[int, slice, str]] = None) -> Union[Dict, List[Dict]]: + """Return the experiment data at the specified index. + + Args: + index: Index of the data to be returned. + Several types are accepted for convenience: + + * None: Return all experiment data. + * int: Specific index of the data. + * slice: A list slice of data indexes. + * str: ID of the job that produced the data. + + Returns: + Experiment data. + + Raises: + TypeError: If the input `index` has an invalid type. + """ + # Get job results if missing experiment data. + if (not self._data) and self._provider: + with self._jobs.lock: + for jid in self._jobs: + if self._jobs[jid] is None: + try: + self._jobs[jid] = self._provider.retrieve_job(jid) + except Exception: # pylint: disable=broad-except + pass + if self._jobs[jid] is not None: + self._add_result_data(self._jobs[jid].result()) + + if index is None: + return self._data.copy() + if isinstance(index, (int, slice)): + return self._data[index] + if isinstance(index, str): + return [data for data in self._data if data.get("job_id") == index] + raise TypeError(f"Invalid index type {type(index)}.") + + @auto_save + def add_figures( + self, + figures, + figure_names=None, + overwrite=False, + save_figure=None, + ) -> Union[str, List[str]]: + """Add the experiment figure. + + Args: + figures (str or bytes or pyplot.Figure or list): Paths of the figure + files or figure data. + figure_names (str or list): Names of the figures. If ``None``, use the figure file + names, if given, or a generated name. If `figures` is a list, then + `figure_names` must also be a list of the same length or ``None``. + overwrite (bool): Whether to overwrite the figure if one already exists with + the same name. + save_figure (bool): Whether to save the figure in the database. If ``None``, + the ``auto-save`` attribute is used. + + Returns: + str or list: + Figure names. + + Raises: + DbExperimentEntryExists: If the figure with the same name already exists, + and `overwrite=True` is not specified. + ValueError: If an input parameter has an invalid value. + """ + if ( + isinstance(figures, list) + and figure_names is not None + and (not isinstance(figure_names, list) or len(figures) != len(figure_names)) + ): + raise ValueError( + "The parameter figure_names must be None or a list of " + "the same size as the parameter figures." + ) + if not isinstance(figures, list): + figures = [figures] + if figure_names is not None and not isinstance(figure_names, list): + figure_names = [figure_names] + + added_figs = [] + for idx, figure in enumerate(figures): + if figure_names is None: + if isinstance(figure, str): + fig_name = figure + else: + fig_name = ( + f"figure_{self.experiment_id[:8]}_" + f"{datetime.now().isoformat()}_{len(self._figures)}" + ) + else: + fig_name = figure_names[idx] + + existing_figure = fig_name in self._figures + if existing_figure and not overwrite: + raise DbExperimentEntryExists( + f"A figure with the name {fig_name} for this experiment " + f"already exists. Specify overwrite=True if you " + f"want to overwrite it." + ) + # figure_data = None + if isinstance(figure, str): + with open(figure, "rb") as file: + figure = file.read() + + self._figures[fig_name] = figure + + save = save_figure if save_figure is not None else self.auto_save + if save and self._service: + if HAS_MATPLOTLIB: + # pylint: disable=import-error + from matplotlib import pyplot + + if isinstance(figure, pyplot.Figure): + figure = plot_to_svg_bytes(figure) + data = { + "experiment_id": self.experiment_id, + "figure": figure, + "figure_name": fig_name, + } + save_data( + is_new=(not existing_figure), + new_func=self._service.create_figure, + update_func=self._service.update_figure, + new_data={}, + update_data=data, + ) + added_figs.append(fig_name) + + return added_figs if len(added_figs) > 1 else added_figs[0] + + @auto_save + def delete_figure( + self, + figure_key: Union[str, int], + ) -> str: + """Add the experiment figure. + + Args: + figure_key: Name or index of the figure. + + Returns: + Figure name. + + Raises: + DbExperimentEntryNotFound: If the figure is not found. + """ + if isinstance(figure_key, int): + figure_key = self._figures.keys()[figure_key] + elif figure_key not in self._figures: + raise DbExperimentEntryNotFound(f"Figure {figure_key} not found.") + + del self._figures[figure_key] + self._deleted_figures.append(figure_key) + + if self._service and self.auto_save: + with service_exception_to_warning(): + self.service.delete_figure(experiment_id=self.experiment_id, figure_name=figure_key) + self._deleted_figures.remove(figure_key) + + return figure_key + + def figure( + self, figure_key: Union[str, int], file_name: Optional[str] = None + ) -> Union[int, bytes]: + """Retrieve the specified experiment figure. + + Args: + figure_key: Name or index of the figure. + file_name: Name of the local file to save the figure to. If ``None``, + the content of the figure is returned instead. + + Returns: + The size of the figure if `file_name` is specified. Otherwise the + content of the figure in bytes. + + Raises: + DbExperimentEntryNotFound: If the figure cannot be found. + """ + if isinstance(figure_key, int): + figure_key = self._figures.keys()[figure_key] + + figure_data = self._figures.get(figure_key, None) + if figure_data is None and self.service: + figure_data = self.service.figure( + experiment_id=self.experiment_id, figure_name=figure_key + ) + self._figures[figure_key] = figure_data + + if figure_data is None: + raise DbExperimentEntryNotFound(f"Figure {figure_key} not found.") + + if file_name: + with open(file_name, "wb") as output: + num_bytes = output.write(figure_data) + return num_bytes + return figure_data + + @auto_save + def add_analysis_results( + self, + results: Union[DbAnalysisResult, List[DbAnalysisResult]], + ) -> None: + """Save the analysis result. + + Args: + results: Analysis results to be saved. + """ + if not isinstance(results, list): + results = [results] + + for result in results: + self._analysis_results[result.result_id] = result + + with contextlib.suppress(DbExperimentDataError): + result.service = self.service + result.auto_save = self.auto_save + + if self.auto_save and self._service: + result.save() + + @auto_save + def delete_analysis_result( + self, + result_key: Union[int, str], + ) -> str: + """Delete the analysis result. + + Args: + result_key: ID or index of the analysis result to be delete. + + Returns: + Analysis result ID. + + Raises: + DbExperimentEntryNotFound: If analysis result not found. + """ + + if isinstance(result_key, int): + result_key = self._analysis_results.keys()[result_key] + else: + # Retrieve from DB if needed. + result_key = self.analysis_results(result_key).result_id + + del self._analysis_results[result_key] + self._deleted_analysis_results.append(result_key) + + if self._service and self.auto_save: + with service_exception_to_warning(): + self.service.delete_analysis_result(result_id=result_key) + self._deleted_analysis_results.remove(result_key) + + return result_key + + def analysis_results( + self, index: Optional[Union[int, slice, str]] = None, refresh: bool = False + ) -> Union[DbAnalysisResult, List[DbAnalysisResult]]: + """Return analysis results associated with this experiment. + + Args: + index: Index of the analysis result to be returned. + Several types are accepted for convenience: + + * None: Return all analysis results. + * int: Specific index of the analysis results. + * slice: A list slice of indexes. + * str: ID of the analysis result. + refresh: Retrieve the latest analysis results from the server, if + an experiment service is available. + + Returns: + Analysis results for this experiment. + + Raises: + TypeError: If the input `index` has an invalid type. + DbExperimentEntryNotFound: If the entry cannot be found. + """ + if self.service and (not self._analysis_results or refresh): + retrieved_results = self.service.analysis_results( + experiment_id=self.experiment_id, limit=None + ) + for result in retrieved_results: + self._analysis_results[result["result_id"]] = result + + if index is None: + return self._analysis_results.values() + if isinstance(index, (int, slice)): + return self._analysis_results.values()[index] + if isinstance(index, str): + if index not in self._analysis_results: + raise DbExperimentEntryNotFound(f"Analysis result {index} not found.") + return self._analysis_results[index] + + raise TypeError(f"Invalid index type {type(index)}.") + + def save(self) -> None: + """Save this experiment in the database. + + Note: + Not all experiment properties are saved. + See :meth:`qiskit.providers.experiment.DatabaseServiceV1.create_experiment` + for fields that are saved. + + Note: + Note that this method does not save analysis results nor figures. Use + ``save_all()`` if you want to save those. + """ + if not self._service: + LOG.warning("Experiment cannot be saved because no experiment service is available.") + return + + if not self._backend: + LOG.warning("Experiment cannot be saved because backend is missing.") + return + + metadata = json.loads(self.serialize_metadata()) + metadata["_source"] = self._source + + update_data = { + "experiment_id": self._id, + "metadata": metadata, + "job_ids": self.job_ids, + "tags": self.tags(), + "notes": self.notes, + } + new_data = {"experiment_type": self._type, "backend_name": self._backend.name()} + if self.share_level: + update_data["share_level"] = self.share_level + + self._created_in_db, _ = save_data( + is_new=(not self._created_in_db), + new_func=self._service.create_experiment, + update_func=self._service.update_experiment, + new_data=new_data, + update_data=update_data, + ) + + def save_all(self) -> None: + """Save this experiment and its analysis results and figures in the database. + + Note: + Depending on the amount of data, this operation could take a while. + """ + # TODO - track changes + if not self._service: + LOG.warning("Experiment cannot be saved because no experiment service is available.") + return + + self.save() + for result in self._analysis_results.values(): + result.save() + + for result in self._deleted_analysis_results.copy(): + with service_exception_to_warning(): + self._service.delete_analysis_result(result_id=result) + self._deleted_analysis_results.remove(result) + + with self._figures.lock: + for name, figure in self._figures.items(): + if HAS_MATPLOTLIB: + # pylint: disable=import-error + from matplotlib import pyplot + + if isinstance(figure, pyplot.Figure): + figure = plot_to_svg_bytes(figure) + data = {"experiment_id": self.experiment_id, "figure": figure, "figure_name": name} + save_data( + is_new=True, + new_func=self._service.create_figure, + update_func=self._service.update_figure, + new_data={}, + update_data=data, + ) + + for name in self._deleted_figures.copy(): + with service_exception_to_warning(): + self._service.delete_figure(experiment_id=self.experiment_id, figure_name=name) + self._deleted_figures.remove(name) + + def serialize_metadata(self) -> str: + """Serialize experiment metadata into a JSON string. + + Returns: + Serialized JSON string. + """ + return json.dumps(self._metadata, cls=self._json_encoder) + + @classmethod + def deserialize_metadata(cls, data: str) -> Any: + """Deserialize experiment metadata from a JSON string. + + Args: + data: Data to be deserialized. + + Returns: + Deserialized data. + """ + return json.loads(data, cls=cls._json_decoder) + + @classmethod + def from_data( + cls, + experiment_type: str, + experiment_id: str, + metadata: Optional[Dict] = None, + **kwargs, + ) -> "DbExperimentDataV1": + """Reconstruct a DbExperimentDataV1 using the input data. + + Args: + experiment_type: Experiment type. + experiment_id: Experiment ID. One will be generated if not supplied. + metadata: Additional experiment metadata. + **kwargs: Additional experiment attributes. + + Returns: + An DbExperimentDataV1 instance. + """ + if metadata: + metadata = cls.deserialize_metadata(json.dumps(metadata)) + return cls( + experiment_type=experiment_type, + experiment_id=experiment_id, + metadata=metadata, + **kwargs, + ) + + def cancel_jobs(self) -> None: + """Cancel any running jobs.""" + for job, fut in self._job_futures.copy(): + if not fut.done() and job.status() not in JOB_FINAL_STATES: + try: + job.cancel() + except Exception as err: # pylint: disable=broad-except + LOG.info("Unable to cancel job %s: %s", job.job_id(), err) + + def block_for_results(self, timeout: Optional[float] = None) -> None: + """Block until all pending jobs and their post processing finish. + + Args: + timeout: Timeout waiting for results. + """ + for job, fut in self._job_futures.copy(): + LOG.info("Waiting for job %s and its post processing to finish.", job.job_id()) + with contextlib.suppress(Exception): + fut.result(timeout) + + def status(self) -> str: + """Return the data processing status. + + If the experiment consists of multiple jobs, the returned status is mapped + in the following order: + + * INITIALIZING - if any job is being initialized. + * VALIDATING - if any job is being validated. + * QUEUED - if any job is queued. + * RUNNING - if any job is still running. + * ERROR - if any job incurred an error. + * CANCELLED - if any job is cancelled. + * POST_PROCESSING - if any of the post-processing functions is still running. + * DONE - if all jobs and their post-processing functions finished. + + Returns: + Data processing status. + """ + if all( + len(container) == 0 + for container in [self._data, self._jobs, self._figures, self._analysis_results] + ): + return "INITIALIZING" + + statuses = set() + with self._job_futures.lock: + for idx, item in enumerate(self._job_futures): + job, fut = item + job_status = job.status() + statuses.add(job_status) + if job_status == JobStatus.ERROR: + job_err = "." + if hasattr(job, "error_message"): + job_err = ": " + job.error_message() + self._errors.append(f"Job {job.job_id()} failed{job_err}") + + if fut.done(): + self._job_futures[idx] = None + ex = fut.exception() + if ex: + self._errors.append( + f"Post processing for job {job.job_id()} failed: \n" + + "".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) + ) + statuses.add(JobStatus.ERROR) + + self._job_futures = ThreadSafeList(list(filter(None, self._job_futures))) + + for stat in [ + JobStatus.INITIALIZING, + JobStatus.VALIDATING, + JobStatus.QUEUED, + JobStatus.RUNNING, + JobStatus.ERROR, + JobStatus.CANCELLED, + ]: + if stat in statuses: + return stat.name + + if self._job_futures: + return "POST_PROCESSING" + + return "DONE" + + def errors(self) -> str: + """Return errors encountered. + + Returns: + Experiment errors. + """ + self.status() # Collect new errors. + return "\n".join(self._errors) + + def tags(self) -> List[str]: + """Return tags assigned to this experiment data. + + Returns: + A list of tags assigned to this experiment data. + + """ + return self._tags + + @auto_save + def set_tags(self, new_tags: List[str]) -> None: + """Set tags for this experiment. + + Args: + new_tags: New tags for the experiment. + """ + self._tags = new_tags + + def metadata(self) -> Dict: + """Return experiment metadata. + + Returns: + Experiment metadata. + """ + return self._metadata + + @auto_save + def set_metadata(self, metadata: Dict) -> None: + """Set metadata for this experiment. + + Args: + metadata: New metadata for the experiment. + """ + self._metadata = copy.deepcopy(metadata) + + @property + def _provider(self) -> Optional[Provider]: + """Return the provider. + + Returns: + Provider used for the experiment, or ``None`` if unknown. + """ + if self._backend is None: + return None + return self._backend.provider() + + @property + def experiment_id(self) -> str: + """Return experiment ID + + Returns: + Experiment ID. + """ + return self._id + + @property + def job_ids(self) -> List[str]: + """Return experiment job IDs. + + Returns: IDs of jobs submitted for this experiment. + """ + return self._jobs.keys() + + @property + def backend(self) -> Optional[Union[BaseBackend, Backend]]: + """Return backend. + + Returns: + Backend this experiment is for, or ``None`` if backend is unknown. + """ + return self._backend + + @property + def experiment_type(self) -> str: + """Return experiment type. + + Returns: + Experiment type. + """ + return self._type + + @property + def figure_names(self) -> List[str]: + """Return names of the figures associated with this experiment. + + Returns: + Names of figures associated with this experiment. + """ + return self._figures.keys() + + @property + def share_level(self) -> str: + """Return the share level fo this experiment. + + Returns: + Experiment share level. + """ + return self._share_level + + @share_level.setter + def share_level(self, new_level: str) -> None: + """Set the experiment share level. + + Args: + new_level: New experiment share level. Valid share levels are provider- + specified. For example, IBM Quantum experiment service allows + "public", "hub", "group", "project", and "private". + """ + self._share_level = new_level + if self.auto_save: + self.save() + + @property + def notes(self) -> str: + """Return experiment notes. + + Returns: + Experiment notes. + """ + return self._notes + + @notes.setter + def notes(self, new_notes: str) -> None: + """Update experiment notes. + + Args: + new_notes: New experiment notes. + """ + self._notes = new_notes + if self.auto_save: + self.save() + + @property + def service(self) -> Optional[DatabaseServiceV1]: + """Return the database service. + + Returns: + Service that can be used to access this experiment in a database. + """ + return self._service + + @service.setter + def service(self, service: DatabaseServiceV1) -> None: + """Set the service to be used for storing experiment data. + + Args: + service: Service to be used. + + Raises: + DbExperimentDataError: If an experiment service is already being used. + """ + if self._service: + raise DbExperimentDataError("An experiment service is already being used.") + self._service = service + with contextlib.suppress(Exception): + self.auto_save = self._service.options.get("auto_save", False) + + @property + def source(self) -> Dict: + """Return the class name and version.""" + return self._source + + def __str__(self): + line = 51 * "-" + n_res = len(self._analysis_results) + status = self.status() + ret = line + ret += f"\nExperiment: {self.experiment_type}" + ret += f"\nExperiment ID: {self.experiment_id}" + ret += f"\nStatus: {status}" + if status == "ERROR": + ret += "\n " + ret += "\n ".join(self._errors) + if self.backend: + ret += f"\nBackend: {self.backend}" + if self.tags(): + ret += f"\nTags: {self.tags()}" + ret += f"\nData: {len(self._data)}" + ret += f"\nAnalysis Results: {n_res}" + ret += f"\nFigures: {len(self._figures)}" + ret += "\n" + line + if n_res: + ret += "\nLast Analysis Result:" + ret += f"\n{str(self._analysis_results.values()[-1])}" + return ret + + def __getattr__(self, name: str) -> Any: + try: + return self._extra_data[name] + except KeyError: + # pylint: disable=raise-missing-from + raise AttributeError("Attribute %s is not defined" % name) diff --git a/qiskit_experiments/database_service/device_component.py b/qiskit_experiments/database_service/device_component.py new file mode 100644 index 0000000000..8c602f94dd --- /dev/null +++ b/qiskit_experiments/database_service/device_component.py @@ -0,0 +1,76 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Device component classes.""" + +from abc import ABC, abstractmethod + + +class DeviceComponent(ABC): + """Class representing a device component.""" + + @abstractmethod + def __str__(self): + pass + + def __repr__(self): + return f"<{self.__class__.__name__}({str(self)})>" + + +class Qubit(DeviceComponent): + """Class representing a qubit device component.""" + + def __init__(self, index: int): + self._index = index + + def __str__(self): + return f"Q{self._index}" + + +class Resonator(DeviceComponent): + """Class representing a resonator device component.""" + + def __init__(self, index: int): + self._index = index + + def __str__(self): + return f"R{self._index}" + + +class UnknownComponent(DeviceComponent): + """Class representing unknown device component.""" + + def __init__(self, component: str): + self._component = component + + def __str__(self): + return self._component + + +def to_component(string: str) -> DeviceComponent: + """Convert the input string to a ``DeviceComponent`` instance. + + Args: + string: String to be converted. + + Returns: + A ``DeviceComponent`` instance. + + Raises: + ValueError: If input string is not a valid device component. + """ + if string.startswith("Q"): + return Qubit(int(string[1:])) + elif string.startswith("R"): + return Resonator(int(string[1:])) + else: + return UnknownComponent(string) diff --git a/qiskit_experiments/database_service/exceptions.py b/qiskit_experiments/database_service/exceptions.py new file mode 100644 index 0000000000..3e9af85fac --- /dev/null +++ b/qiskit_experiments/database_service/exceptions.py @@ -0,0 +1,33 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Exceptions for errors raised by experiment service data.""" + +from qiskit.exceptions import QiskitError + + +class DbExperimentDataError(QiskitError): + """Base class for errors raised by experiment service data.""" + + pass + + +class DbExperimentEntryNotFound(DbExperimentDataError): + """Errors raised when an experiment entry cannot be found.""" + + pass + + +class DbExperimentEntryExists(DbExperimentDataError): + """Errors raised when an experiment entry already exists.""" + + pass diff --git a/qiskit_experiments/database_service/json.py b/qiskit_experiments/database_service/json.py new file mode 100644 index 0000000000..8dbb37f2cc --- /dev/null +++ b/qiskit_experiments/database_service/json.py @@ -0,0 +1,47 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. +# pylint: disable=method-hidden + +"""Experiment serialization methods.""" + +import json +from typing import Any + +import numpy as np + + +class NumpyEncoder(json.JSONEncoder): + """JSON Encoder for Numpy arrays and complex numbers.""" + + def default(self, obj: Any) -> Any: # pylint: disable=arguments-differ + if hasattr(obj, "tolist"): + return {"type": "array", "value": obj.tolist()} + if isinstance(obj, complex): + return {"type": "complex", "value": [obj.real, obj.imag]} + return super().default(obj) + + +class NumpyDecoder(json.JSONDecoder): + """JSON Decoder for Numpy arrays and complex numbers.""" + + def __init__(self, *args, **kwargs): + super().__init__(object_hook=self.object_hook, *args, **kwargs) + + def object_hook(self, obj): + """Object hook.""" + if "type" in obj: + if obj["type"] == "complex": + val = obj["value"] + return val[0] + 1j * val[1] + if obj["type"] == "array": + return np.array(obj["value"]) + return obj diff --git a/qiskit_experiments/database_service/utils.py b/qiskit_experiments/database_service/utils.py new file mode 100644 index 0000000000..8c0322f382 --- /dev/null +++ b/qiskit_experiments/database_service/utils.py @@ -0,0 +1,220 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Experiment utility functions.""" + +import logging +from typing import Callable, Tuple, Dict, Any, Union +import io +from datetime import datetime, timezone +import threading +from collections import OrderedDict +from abc import ABC, abstractmethod +import traceback +import pkg_resources + +import dateutil.parser +from dateutil import tz + +from qiskit.version import __version__ as terra_version +from ..version import __version__ as experiments_version + +from .exceptions import DbExperimentEntryNotFound, DbExperimentEntryExists, DbExperimentDataError + +LOG = logging.getLogger(__name__) + + +def qiskit_version(): + """Return the Qiskit version.""" + try: + return pkg_resources.get_distribution("qiskit").version + except Exception: # pylint: disable=broad-except + return {"qiskit-terra": terra_version, "qiskit-experiments": experiments_version} + + +def parse_timestamp(utc_dt: Union[datetime, str]) -> datetime: + """Parse a UTC ``datetime`` object or string. + + Args: + utc_dt: Input UTC `datetime` or string. + + Returns: + A ``datetime`` with the UTC timezone. + + Raises: + TypeError: If the input parameter value is not valid. + """ + if isinstance(utc_dt, str): + utc_dt = dateutil.parser.parse(utc_dt) + if not isinstance(utc_dt, datetime): + raise TypeError("Input `utc_dt` is not string or datetime.") + utc_dt = utc_dt.replace(tzinfo=timezone.utc) + return utc_dt + + +def utc_to_local(utc_dt: datetime) -> datetime: + """Convert input UTC timestamp to local timezone. + + Args: + utc_dt: Input UTC timestamp. + + Returns: + A ``datetime`` with the local timezone. + """ + local_dt = utc_dt.astimezone(tz.tzlocal()) + return local_dt + + +def plot_to_svg_bytes(figure: "pyplot.Figure") -> bytes: + """Convert a pyplot Figure to SVG in bytes. + + Args: + figure: Figure to be converted + + Returns: + Figure in bytes. + """ + buf = io.BytesIO() + opaque_color = list(figure.get_facecolor()) + opaque_color[3] = 1.0 # set alpha to opaque + figure.savefig(buf, format="svg", facecolor=tuple(opaque_color), edgecolor="none") + buf.seek(0) + figure_data = buf.read() + buf.close() + return figure_data + + +def save_data( + is_new: bool, new_func: Callable, update_func: Callable, new_data: Dict, update_data: Dict +) -> Tuple[bool, Any]: + """Save data in the database. + + Args: + is_new: ``True`` if `new_func` should be called. Otherwise `update_func` is called. + new_func: Function to create new entry in the database. + update_func: Function to update an existing entry in the database. + new_data: In addition to `update_data`, this data will be stored if creating + a new entry. + update_data: Data to be stored if updating an existing entry. + + Returns: + A tuple of whether the data was saved and the function return value. + + Raises: + DbExperimentDataError: If unable to determine whether the entry exists. + """ + attempts = 0 + try: + # Attempt 3x for the unlikely scenario wherein is_new=False but the + # entry doesn't actually exists. The second try might also fail if an entry + # with the same ID somehow got created in the meantime. + while attempts < 3: + attempts += 1 + if is_new: + try: + return True, new_func(**{**new_data, **update_data}) + except DbExperimentEntryExists: + is_new = False + else: + try: + return True, update_func(**update_data) + except DbExperimentEntryNotFound: + is_new = True + raise DbExperimentDataError("Unable to determine the existence of the entry.") + except Exception: # pylint: disable=broad-except + # Don't fail the experiment just because its data cannot be saved. + LOG.error("Unable to save the experiment data: %s", traceback.format_exc()) + return False, None + + +class ThreadSafeContainer(ABC): + """Base class for thread safe container.""" + + def __init__(self, init_values=None): + """ThreadSafeContainer constructor.""" + self._lock = threading.RLock() + self._container = self._init_container(init_values) + + @abstractmethod + def _init_container(self, init_values): + """Initialize the container.""" + pass + + def __getitem__(self, key): + with self._lock: + return self._container[key] + + def __setitem__(self, key, value): + with self._lock: + self._container[key] = value + + def __delitem__(self, key): + with self._lock: + del self._container[key] + + def __contains__(self, item): + with self._lock: + return item in self._container + + def __len__(self): + with self._lock: + return len(self._container) + + @property + def lock(self): + """Return lock used for this container.""" + return self._lock + + +class ThreadSafeOrderedDict(ThreadSafeContainer): + """Thread safe OrderedDict.""" + + def _init_container(self, init_values): + """Initialize the container.""" + return OrderedDict.fromkeys(init_values or []) + + def get(self, key, default): + """Return the value of the given key.""" + with self._lock: + return self._container.get(key, default) + + def keys(self): + """Return all key values.""" + with self._lock: + return list(self._container.keys()) + + def values(self): + """Return all values.""" + with self._lock: + return list(self._container.values()) + + def items(self): + """Return the key value pairs.""" + return self._container.items() + + +class ThreadSafeList(ThreadSafeContainer): + """Thread safe list.""" + + def _init_container(self, init_values): + """Initialize the container.""" + return init_values or [] + + def append(self, value): + """Append to the list.""" + with self._lock: + self._container.append(value) + + def copy(self): + """Returns a copy of the list.""" + with self.lock: + return self._container.copy() diff --git a/test/database_service/__init__.py b/test/database_service/__init__.py new file mode 100644 index 0000000000..e0bf9c8b4e --- /dev/null +++ b/test/database_service/__init__.py @@ -0,0 +1,13 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Test cases for database service modules.""" diff --git a/test/database_service/test_db_analysis_result.py b/test/database_service/test_db_analysis_result.py new file mode 100644 index 0000000000..c0b7829816 --- /dev/null +++ b/test/database_service/test_db_analysis_result.py @@ -0,0 +1,164 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +# pylint: disable=missing-docstring + +"""Test AnalysisResult.""" + +from unittest import mock +import json + +import numpy as np + +from qiskit.test import QiskitTestCase +from qiskit_experiments.database_service import DbAnalysisResultV1 as DbAnalysisResult +from qiskit_experiments.database_service.device_component import Qubit, Resonator, to_component +from qiskit_experiments.database_service.database_service import DatabaseServiceV1 +from qiskit_experiments.database_service.exceptions import DbExperimentDataError + + +class TestDbAnalysisResult(QiskitTestCase): + """Test the DbAnalysisResult class.""" + + def test_analysis_result_attributes(self): + """Test analysis result attributes.""" + attrs = { + "result_type": "my_type", + "device_components": [Qubit(1), Qubit(2)], + "experiment_id": "1234", + "result_id": "5678", + "quality": "Good", + "verified": False, + } + result = DbAnalysisResult(result_data={"foo": "bar"}, tags=["tag1", "tag2"], **attrs) + self.assertEqual({"foo": "bar"}, result.data()) + self.assertEqual(["tag1", "tag2"], result.tags()) + for key, val in attrs.items(): + self.assertEqual(val, getattr(result, key)) + + def test_save(self): + """Test saving analysis result.""" + mock_service = mock.create_autospec(DatabaseServiceV1) + result = self._new_analysis_result() + result.service = mock_service + result.save() + mock_service.create_analysis_result.assert_called_once() + + def test_auto_save(self): + """Test auto saving.""" + mock_service = mock.create_autospec(DatabaseServiceV1) + result = self._new_analysis_result(service=mock_service) + result.auto_save = True + result.save() + + subtests = [ + # update function, update parameters, service called + (result.set_tags, (["foo"],)), + (result.set_data, ({"foo": "bar"},)), + (setattr, (result, "quality", "GOOD")), + (setattr, (result, "verified", True)), + ] + + for func, params in subtests: + with self.subTest(func=func): + func(*params) + mock_service.update_analysis_result.assert_called_once() + mock_service.reset_mock() + + def test_set_service_init(self): + """Test setting service in init.""" + mock_service = mock.create_autospec(DatabaseServiceV1) + result = self._new_analysis_result(service=mock_service) + self.assertEqual(mock_service, result.service) + + def test_set_service_direct(self): + """Test setting service directly.""" + mock_service = mock.create_autospec(DatabaseServiceV1) + result = self._new_analysis_result() + result.service = mock_service + self.assertEqual(mock_service, result.service) + + with self.assertRaises(DbExperimentDataError): + result.service = mock_service + + def test_set_data(self): + """Test setting data.""" + result = self._new_analysis_result() + result.set_data({"foo": "new data"}) + self.assertEqual({"foo": "new data"}, result.data()) + + def test_set_tags(self): + """Test setting tags.""" + result = self._new_analysis_result() + result.set_tags(["new_tag"]) + self.assertEqual(["new_tag"], result.tags()) + + def test_update_quality(self): + """Test updating quality.""" + result = self._new_analysis_result(quality="BAD") + result.quality = "GOOD" + self.assertEqual("GOOD", result.quality) + + def test_update_verified(self): + """Test updating verified.""" + result = self._new_analysis_result(verified=False) + result.verified = True + self.assertTrue(result.verified) + + def test_additional_attr(self): + """Test additional attributes.""" + result = self._new_analysis_result(foo="bar") + self.assertEqual("bar", result.foo) + + def test_data_serialization(self): + """Test result data serialization.""" + result = self._new_analysis_result(result_data={"complex": 2 + 3j, "numpy": np.zeros(2)}) + serialized = result.serialize_data() + self.assertIsInstance(serialized, str) + self.assertTrue(json.loads(serialized)) + + def test_source(self): + """Test getting analysis result source.""" + result = self._new_analysis_result() + self.assertIn("DbAnalysisResultV1", result.source["class"]) + self.assertTrue(result.source["qiskit_version"]) + + def _new_analysis_result(self, **kwargs): + """Return a new analysis result.""" + values = { + "result_data": {"foo": "bar"}, + "result_type": "some_type", + "device_components": ["Q1", "Q2"], + "experiment_id": "1234", + } + values.update(kwargs) + return DbAnalysisResult(**values) + + +class TestDeviceComponent(QiskitTestCase): + """Test the DeviceComponent class.""" + + def test_str(self): + """Test string representation.""" + q1 = Qubit(1) + r1 = Resonator(1) + self.assertEqual("Q1", str(q1)) + self.assertEqual("R1", str(r1)) + + def test_to_component(self): + """Test converting string to component object.""" + q1 = to_component("Q1") + self.assertIsInstance(q1, Qubit) + self.assertEqual("Q1", str(q1)) + r1 = to_component("R1") + self.assertIsInstance(r1, Resonator) + self.assertEqual("R1", str(r1)) diff --git a/test/database_service/test_db_experiment_data.py b/test/database_service/test_db_experiment_data.py new file mode 100644 index 0000000000..16d4269337 --- /dev/null +++ b/test/database_service/test_db_experiment_data.py @@ -0,0 +1,724 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +# pylint: disable=missing-docstring + +"""Test ExperimentData.""" + +import os +from unittest import mock, skipIf +import copy +from random import randrange +import time +import threading +import json +import re +import uuid + +import numpy as np + +from qiskit.test import QiskitTestCase +from qiskit.test.mock import FakeMelbourne +from qiskit.result import Result +from qiskit.providers import JobV1 as Job +from qiskit.providers import JobStatus +from qiskit.tools.visualization import HAS_MATPLOTLIB +from qiskit_experiments.database_service import DbExperimentDataV1 as DbExperimentData +from qiskit_experiments.database_service import DatabaseServiceV1 +from qiskit_experiments.database_service.exceptions import ( + DbExperimentDataError, + DbExperimentEntryNotFound, + DbExperimentEntryExists, +) + + +class TestDbExperimentData(QiskitTestCase): + """Test the DbExperimentData class.""" + + def setUp(self): + super().setUp() + self.backend = FakeMelbourne() + + def test_stored_data_attributes(self): + """Test stored data attributes.""" + attrs = { + "job_ids": ["job1"], + "share_level": "public", + "figure_names": ["figure1"], + "notes": "some notes", + } + exp_data = DbExperimentData( + backend=self.backend, + experiment_type="qiskit_test", + experiment_id="1234", + tags=["tag1", "tag2"], + metadata={"foo": "bar"}, + **attrs, + ) + self.assertEqual(exp_data.backend.name(), self.backend.name()) + self.assertEqual(exp_data.experiment_type, "qiskit_test") + self.assertEqual(exp_data.experiment_id, "1234") + self.assertEqual(exp_data.tags(), ["tag1", "tag2"]) + self.assertEqual(exp_data.metadata(), {"foo": "bar"}) + for key, val in attrs.items(): + self.assertEqual(getattr(exp_data, key), val) + + def test_add_data_dict(self): + """Test add data in dictionary.""" + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + a_dict = {"counts": {"01": 518}} + dicts = [{"counts": {"00": 284}}, {"counts": {"00": 14}}] + + exp_data.add_data(a_dict) + exp_data.add_data(dicts) + self.assertEqual([a_dict] + dicts, exp_data.data()) + + def test_add_data_result(self): + """Test add result data.""" + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + a_result = self._get_job_result(1) + results = [self._get_job_result(2), self._get_job_result(3)] + + expected = [a_result.get_counts()] + for res in results: + expected.extend(res.get_counts()) + + exp_data.add_data(a_result) + exp_data.add_data(results) + self.assertEqual(expected, [sdata["counts"] for sdata in exp_data.data()]) + self.assertIn(a_result.job_id, exp_data.job_ids) + + def test_add_data_result_metadata(self): + """Test add result metadata.""" + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + result1 = self._get_job_result(1, has_metadata=False) + result2 = self._get_job_result(1, has_metadata=True) + + exp_data.add_data(result1) + exp_data.add_data(result2) + self.assertNotIn("metadata", exp_data.data(0)) + self.assertIn("metadata", exp_data.data(1)) + + def test_add_data_job(self): + """Test add job data.""" + a_job = mock.create_autospec(Job, instance=True) + a_job.result.return_value = self._get_job_result(3) + jobs = [] + for _ in range(2): + job = mock.create_autospec(Job, instance=True) + job.result.return_value = self._get_job_result(2) + jobs.append(job) + + expected = a_job.result().get_counts() + for job in jobs: + expected.extend(job.result().get_counts()) + + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + exp_data.add_data(a_job) + exp_data.add_data(jobs) + exp_data.block_for_results() + self.assertEqual(expected, [sdata["counts"] for sdata in exp_data.data()]) + self.assertIn(a_job.job_id(), exp_data.job_ids) + + def test_add_data_job_callback(self): + """Test add job data with callback.""" + + def _callback(_exp_data): + self.assertIsInstance(_exp_data, DbExperimentData) + self.assertEqual( + [dat["counts"] for dat in _exp_data.data()], a_job.result().get_counts() + ) + exp_data.add_figures(str.encode("hello world")) + exp_data.add_analysis_results(mock.MagicMock()) + nonlocal called_back + called_back = True + + a_job = mock.create_autospec(Job, instance=True) + a_job.result.return_value = self._get_job_result(2) + + called_back = False + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + exp_data.add_data(a_job, post_processing_callback=_callback) + exp_data.block_for_results() + self.assertTrue(called_back) + + def test_add_data_callback(self): + """Test add data with callback.""" + + def _callback(_exp_data): + self.assertIsInstance(_exp_data, DbExperimentData) + nonlocal called_back_count, expected_data, subtests + expected_data.extend(subtests[called_back_count][1]) + self.assertEqual([dat["counts"] for dat in _exp_data.data()], expected_data) + called_back_count += 1 + + a_result = self._get_job_result(1) + results = [self._get_job_result(1), self._get_job_result(1)] + a_dict = {"counts": {"01": 518}} + dicts = [{"counts": {"00": 284}}, {"counts": {"00": 14}}] + + subtests = [ + (a_result, [a_result.get_counts()]), + (results, [res.get_counts() for res in results]), + (a_dict, [a_dict["counts"]]), + (dicts, [dat["counts"] for dat in dicts]), + ] + + called_back_count = 0 + expected_data = [] + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + + for data, _ in subtests: + with self.subTest(data=data): + exp_data.add_data(data, post_processing_callback=_callback) + + self.assertEqual(len(subtests), called_back_count) + + def test_add_data_job_callback_kwargs(self): + """Test add job data with callback and additional arguments.""" + + def _callback(_exp_data, **kwargs): + self.assertIsInstance(_exp_data, DbExperimentData) + self.assertEqual({"foo": callback_kwargs}, kwargs) + nonlocal called_back + called_back = True + + a_job = mock.create_autospec(Job, instance=True) + a_job.result.return_value = self._get_job_result(2) + + called_back = False + callback_kwargs = "foo" + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + exp_data.add_data(a_job, _callback, foo=callback_kwargs) + exp_data.block_for_results() + self.assertTrue(called_back) + + def test_add_data_pending_post_processing(self): + """Test add job data while post processing is still running.""" + + def _callback(_exp_data, **kwargs): + kwargs["event"].wait(timeout=3) + + a_job = mock.create_autospec(Job, instance=True) + a_job.result.return_value = self._get_job_result(2) + + event = threading.Event() + self.addCleanup(event.set) + + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_data(a_job, _callback, event=event) + with self.assertLogs("qiskit_experiments", "WARNING"): + exp_data.add_data({"foo": "bar"}) + + def test_get_data(self): + """Test getting data.""" + data1 = [] + for _ in range(5): + data1.append({"counts": {"00": randrange(1024)}}) + results = self._get_job_result(3) + + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_data(data1) + exp_data.add_data(results) + self.assertEqual(data1[1], exp_data.data(1)) + self.assertEqual(data1[2:4], exp_data.data(slice(2, 4))) + self.assertEqual( + results.get_counts(), [sdata["counts"] for sdata in exp_data.data(results.job_id)] + ) + + def test_add_figure(self): + """Test adding a new figure.""" + hello_bytes = str.encode("hello world") + file_name = uuid.uuid4().hex + self.addCleanup(os.remove, file_name) + with open(file_name, "wb") as file: + file.write(hello_bytes) + + sub_tests = [ + ("file name", file_name, None), + ("file bytes", hello_bytes, None), + ("new name", hello_bytes, "hello_again.svg"), + ] + + for name, figure, figure_name in sub_tests: + with self.subTest(name=name): + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + fn = exp_data.add_figures(figure, figure_name) + self.assertEqual(hello_bytes, exp_data.figure(fn)) + + @skipIf(not HAS_MATPLOTLIB, "matplotlib not available.") + def test_add_figure_plot(self): + """Test adding a matplotlib figure.""" + # pylint: disable=import-error + import matplotlib.pyplot as plt + + figure, ax = plt.subplots() + ax.plot([1, 2, 3]) + + service = self._set_mock_service() + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + exp_data.add_figures(figure, save_figure=True) + self.assertEqual(figure, exp_data.figure(0)) + service.create_figure.assert_called_once() + _, kwargs = service.create_figure.call_args + self.assertIsInstance(kwargs["figure"], bytes) + + def test_add_figures(self): + """Test adding multiple new figures.""" + hello_bytes = [str.encode("hello world"), str.encode("hello friend")] + file_names = [uuid.uuid4().hex, uuid.uuid4().hex] + for idx, fn in enumerate(file_names): + self.addCleanup(os.remove, fn) + with open(fn, "wb") as file: + file.write(hello_bytes[idx]) + + sub_tests = [ + ("file names", file_names, None), + ("file bytes", hello_bytes, None), + ("new names", hello_bytes, ["hello1.svg", "hello2.svg"]), + ] + + for name, figures, figure_names in sub_tests: + with self.subTest(name=name): + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + added_names = exp_data.add_figures(figures, figure_names) + for idx, added_fn in enumerate(added_names): + self.assertEqual(hello_bytes[idx], exp_data.figure(added_fn)) + + def test_add_figure_overwrite(self): + """Test updating an existing figure.""" + hello_bytes = str.encode("hello world") + friend_bytes = str.encode("hello friend!") + + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + fn = exp_data.add_figures(hello_bytes) + with self.assertRaises(DbExperimentEntryExists): + exp_data.add_figures(friend_bytes, fn) + + exp_data.add_figures(friend_bytes, fn, overwrite=True) + self.assertEqual(friend_bytes, exp_data.figure(fn)) + + def test_add_figure_save(self): + """Test saving a figure in the database.""" + hello_bytes = str.encode("hello world") + service = self._set_mock_service() + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + exp_data.add_figures(hello_bytes, save_figure=True) + service.create_figure.assert_called_once() + _, kwargs = service.create_figure.call_args + self.assertEqual(kwargs["figure"], hello_bytes) + self.assertEqual(kwargs["experiment_id"], exp_data.experiment_id) + + def test_add_figure_bad_input(self): + """Test adding figures with bad input.""" + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + self.assertRaises(ValueError, exp_data.add_figures, ["foo", "bar"], ["name"]) + + def test_get_figure(self): + """Test getting figure.""" + exp_data = DbExperimentData(experiment_type="qiskit_test") + figure_template = "hello world {}" + name_template = "figure_{}" + for idx in range(3): + exp_data.add_figures( + str.encode(figure_template.format(idx)), figure_names=name_template.format(idx) + ) + idx = randrange(3) + expected_figure = str.encode(figure_template.format(idx)) + self.assertEqual(expected_figure, exp_data.figure(name_template.format(idx))) + self.assertEqual(expected_figure, exp_data.figure(idx)) + + file_name = uuid.uuid4().hex + self.addCleanup(os.remove, file_name) + exp_data.figure(idx, file_name) + with open(file_name, "rb") as file: + self.assertEqual(expected_figure, file.read()) + + def test_delete_figure(self): + """Test deleting a figure.""" + exp_data = DbExperimentData(experiment_type="qiskit_test") + id_template = "figure_{}" + for idx in range(3): + exp_data.add_figures(str.encode("hello world"), id_template.format(idx)) + + sub_tests = [(1, id_template.format(1)), (id_template.format(2), id_template.format(2))] + + for del_key, figure_name in sub_tests: + with self.subTest(del_key=del_key): + exp_data.delete_figure(del_key) + self.assertRaises(DbExperimentEntryNotFound, exp_data.figure, figure_name) + + def test_delayed_backend(self): + """Test initializing experiment data without a backend.""" + exp_data = DbExperimentData(experiment_type="qiskit_test") + self.assertIsNone(exp_data.backend) + self.assertIsNone(exp_data.service) + exp_data.save() + a_job = mock.create_autospec(Job, instance=True) + exp_data.add_data(a_job) + self.assertIsNotNone(exp_data.backend) + self.assertIsNotNone(exp_data.service) + + def test_different_backend(self): + """Test setting a different backend.""" + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + a_job = mock.create_autospec(Job, instance=True) + self.assertNotEqual(exp_data.backend, a_job.backend()) + with self.assertLogs("qiskit_experiments", "WARNING"): + exp_data.add_data(a_job) + + def test_add_get_analysis_result(self): + """Test adding and getting analysis results.""" + exp_data = DbExperimentData(experiment_type="qiskit_test") + results = [] + for idx in range(5): + res = mock.MagicMock() + res.result_id = idx + results.append(res) + exp_data.add_analysis_results(res) + + self.assertEqual(results, exp_data.analysis_results()) + self.assertEqual(results[1], exp_data.analysis_results(1)) + self.assertEqual(results[2:4], exp_data.analysis_results(slice(2, 4))) + self.assertEqual(results[4], exp_data.analysis_results(results[4].result_id)) + + def test_add_get_analysis_results(self): + """Test adding and getting a list of analysis results.""" + exp_data = DbExperimentData(experiment_type="qiskit_test") + results = [] + for idx in range(5): + res = mock.MagicMock() + res.result_id = idx + results.append(res) + exp_data.add_analysis_results(results) + + self.assertEqual(results, exp_data.analysis_results()) + + def test_delete_analysis_result(self): + """Test deleting analysis result.""" + exp_data = DbExperimentData(experiment_type="qiskit_test") + id_template = "result_{}" + for idx in range(3): + res = mock.MagicMock() + res.result_id = id_template.format(idx) + exp_data.add_analysis_results(res) + + subtests = [(0, id_template.format(0)), (id_template.format(2), id_template.format(2))] + for del_key, res_id in subtests: + with self.subTest(del_key=del_key): + exp_data.delete_analysis_result(del_key) + self.assertRaises(DbExperimentEntryNotFound, exp_data.analysis_results, res_id) + + def test_save(self): + """Test saving experiment data.""" + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + service = mock.create_autospec(DatabaseServiceV1, instance=True) + exp_data.service = service + exp_data.save() + service.create_experiment.assert_called_once() + _, kwargs = service.create_experiment.call_args + self.assertEqual(exp_data.experiment_id, kwargs["experiment_id"]) + exp_data.save() + service.update_experiment.assert_called_once() + _, kwargs = service.update_experiment.call_args + self.assertEqual(exp_data.experiment_id, kwargs["experiment_id"]) + + def test_save_all(self): + """Test saving all experiment related data.""" + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + service = mock.create_autospec(DatabaseServiceV1, instance=True) + exp_data.add_figures(str.encode("hello world")) + analysis_result = mock.MagicMock() + exp_data.add_analysis_results(analysis_result) + exp_data.service = service + exp_data.save_all() + service.create_experiment.assert_called_once() + service.create_figure.assert_called_once() + analysis_result.save.assert_called_once() + + def test_save_all_delete(self): + """Test saving all deletion.""" + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + service = mock.create_autospec(DatabaseServiceV1, instance=True) + exp_data.add_figures(str.encode("hello world")) + exp_data.add_analysis_results(mock.MagicMock()) + exp_data.delete_analysis_result(0) + exp_data.delete_figure(0) + exp_data.service = service + + exp_data.save_all() + service.create_experiment.assert_called_once() + service.delete_figure.assert_called_once() + service.delete_analysis_result.assert_called_once() + + def test_set_service_backend(self): + """Test setting service via backend.""" + mock_service = self._set_mock_service() + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + self.assertEqual(mock_service, exp_data.service) + + def test_set_service_job(self): + """Test setting service via adding a job.""" + mock_service = self._set_mock_service() + job = mock.create_autospec(Job, instance=True) + job.backend.return_value = self.backend + exp_data = DbExperimentData(experiment_type="qiskit_test") + self.assertIsNone(exp_data.service) + exp_data.add_data(job) + self.assertEqual(mock_service, exp_data.service) + + def test_set_service_direct(self): + """Test setting service directly.""" + exp_data = DbExperimentData(experiment_type="qiskit_test") + self.assertIsNone(exp_data.service) + mock_service = mock.MagicMock() + exp_data.service = mock_service + self.assertEqual(mock_service, exp_data.service) + + with self.assertRaises(DbExperimentDataError): + exp_data.service = mock_service + + def test_new_backend_has_service(self): + """Test changing backend doesn't change existing service.""" + orig_service = self._set_mock_service() + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + self.assertEqual(orig_service, exp_data.service) + + job = mock.create_autospec(Job, instance=True) + new_service = self._set_mock_service() + self.assertNotEqual(orig_service, new_service) + job.backend.return_value = self.backend + exp_data.add_data(job) + self.assertEqual(orig_service, exp_data.service) + + def test_auto_save(self): + """Test auto save.""" + service = self._set_mock_service() + exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") + exp_data.auto_save = True + + mock_result = mock.MagicMock() + exp_data.save() + + subtests = [ + # update function, update parameters, service called + (exp_data.add_analysis_results, (mock_result,), mock_result.save), + (exp_data.add_figures, (str.encode("hello world"),), service.create_figure), + (exp_data.delete_figure, (0,), service.delete_figure), + (exp_data.delete_analysis_result, (0,), service.delete_analysis_result), + (exp_data.set_tags, (["foo"],), None), + (exp_data.set_metadata, ({"foo": "bar"},), None), + (setattr, (exp_data, "notes", "foo"), None), + (setattr, (exp_data, "share_level", "hub"), None), + ] + + for func, params, called in subtests: + with self.subTest(func=func): + func(*params) + service.update_experiment.assert_called_once() + if called: + called.assert_called_once() + service.reset_mock() + + def test_status_job_pending(self): + """Test experiment status when job is pending.""" + job1 = mock.create_autospec(Job, instance=True) + job1.result.return_value = self._get_job_result(3) + job1.status.return_value = JobStatus.DONE + + event = threading.Event() + job2 = mock.create_autospec(Job, instance=True) + job2.result = lambda *args, **kwargs: event.wait() + job2.status.return_value = JobStatus.RUNNING + self.addCleanup(event.set) + + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_data(job1) + exp_data.add_data(job2, lambda *args, **kwargs: event.wait()) + self.assertEqual("RUNNING", exp_data.status()) + + def test_status_job_error(self): + """Test experiment status when job failed.""" + job1 = mock.create_autospec(Job, instance=True) + job1.result.return_value = self._get_job_result(3) + job1.status.return_value = JobStatus.DONE + + job2 = mock.create_autospec(Job, instance=True) + job2.status.return_value = JobStatus.ERROR + + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_data([job1, job2]) + self.assertEqual("ERROR", exp_data.status()) + + def test_status_post_processing(self): + """Test experiment status during post processing.""" + job = mock.create_autospec(Job, instance=True) + job.result.return_value = self._get_job_result(3) + + event = threading.Event() + self.addCleanup(event.set) + + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_data(job) + exp_data.add_data(job, lambda *args, **kwargs: event.wait()) + self.assertEqual("POST_PROCESSING", exp_data.status()) + + def test_status_post_processing_error(self): + """Test experiment status when post processing failed.""" + + def _post_processing(*args, **kwargs): + raise ValueError("Kaboom!") + + job = mock.create_autospec(Job, instance=True) + job.result.return_value = self._get_job_result(3) + + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_data(job) + exp_data.add_data(job, _post_processing) + exp_data.block_for_results() + self.assertEqual("ERROR", exp_data.status()) + + def test_status_done(self): + """Test experiment status when all jobs are done.""" + job = mock.create_autospec(Job, instance=True) + job.result.return_value = self._get_job_result(3) + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_data(job) + exp_data.add_data(job, lambda *args, **kwargs: time.sleep(1)) + exp_data.block_for_results() + self.assertEqual("DONE", exp_data.status()) + + def test_set_tags(self): + """Test updating experiment tags.""" + exp_data = DbExperimentData(experiment_type="qiskit_test", tags=["foo"]) + self.assertEqual(["foo"], exp_data.tags()) + exp_data.set_tags(["bar"]) + self.assertEqual(["bar"], exp_data.tags()) + + def test_set_metadata(self): + """Test updating experiment metadata.""" + exp_data = DbExperimentData(experiment_type="qiskit_test", metadata={"foo": "bar"}) + self.assertEqual({"foo": "bar"}, exp_data.metadata()) + exp_data.set_metadata({"bar": "foo"}) + self.assertEqual({"bar": "foo"}, exp_data.metadata()) + + def test_cancel_jobs(self): + """Test canceling experiment jobs.""" + exp_data = DbExperimentData(experiment_type="qiskit_test") + event = threading.Event() + self.addCleanup(event.set) + job = mock.create_autospec(Job, instance=True) + job.result = lambda *args, **kwargs: event.wait() + exp_data.add_data(job) + exp_data.cancel_jobs() + job.cancel.assert_called_once() + + def test_metadata_serialization(self): + """Test experiment metadata serialization.""" + metadata = {"complex": 2 + 3j, "numpy": np.zeros(2)} + exp_data = DbExperimentData(experiment_type="qiskit_test", metadata=metadata) + serialized = exp_data.serialize_metadata() + self.assertIsInstance(serialized, str) + self.assertTrue(json.loads(serialized)) + + deserialized = DbExperimentData.deserialize_metadata(serialized) + self.assertEqual(metadata["complex"], deserialized["complex"]) + self.assertEqual(metadata["numpy"].all(), deserialized["numpy"].all()) + + def test_errors(self): + """Test getting experiment error message.""" + + def _post_processing(*args, **kwargs): # pylint: disable=unused-argument + raise ValueError("Kaboom!") + + job1 = mock.create_autospec(Job, instance=True) + job1.job_id.return_value = "1234" + + job2 = mock.create_autospec(Job, instance=True) + job2.status.return_value = JobStatus.ERROR + job2.job_id.return_value = "5678" + + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_data(job1, _post_processing) + exp_data.add_data(job2) + exp_data.block_for_results() + self.assertEqual("ERROR", exp_data.status()) + self.assertTrue(re.match(r".*1234.*Kaboom!.*5678", exp_data.errors(), re.DOTALL)) + + def test_source(self): + """Test getting experiment source.""" + exp_data = DbExperimentData(experiment_type="qiskit_test") + self.assertIn("DbExperimentDataV1", exp_data.source["class"]) + self.assertTrue(exp_data.source["qiskit_version"]) + + def test_block_for_jobs(self): + """Test blocking for jobs.""" + + def _sleeper(*args, **kwargs): # pylint: disable=unused-argument + time.sleep(2) + nonlocal sleep_count + sleep_count += 1 + return self._get_job_result(1) + + sleep_count = 0 + job = mock.create_autospec(Job, instance=True) + job.result = _sleeper + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_data(job, _sleeper) + exp_data.block_for_results() + self.assertEqual(2, sleep_count) + + def test_additional_attr(self): + """Test additional experiment attributes.""" + exp_data = DbExperimentData(experiment_type="qiskit_test", foo="foo") + self.assertEqual("foo", exp_data.foo) + + def test_str(self): + """Test the string representation.""" + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_data(self._get_job_result(1)) + result = mock.MagicMock() + exp_data.add_analysis_results(result) + exp_data_str = str(exp_data) + self.assertIn(exp_data.experiment_type, exp_data_str) + self.assertIn(exp_data.experiment_id, exp_data_str) + self.assertIn(str(result), exp_data_str) + + def _get_job_result(self, circ_count, has_metadata=False): + """Return a job result with random counts.""" + job_result = { + "backend_name": self.backend.name(), + "backend_version": "1.1.1", + "qobj_id": "1234", + "job_id": "some_job_id", + "success": True, + "results": [], + } + circ_result_template = {"shots": 1024, "success": True, "data": {}} + + for _ in range(circ_count): + counts = randrange(1024) + circ_result = copy.copy(circ_result_template) + circ_result["data"] = {"counts": {"0x0": counts, "0x3": 1024 - counts}} + if has_metadata: + circ_result["header"] = {"metadata": {"meas_basis": "pauli"}} + job_result["results"].append(circ_result) + + return Result.from_dict(job_result) + + def _set_mock_service(self): + """Add a mock service to the backend.""" + mock_provider = mock.MagicMock() + self.backend._provider = mock_provider + mock_service = mock.create_autospec(DatabaseServiceV1, instance=True) + mock_provider.service.return_value = mock_service + return mock_service