diff --git a/.github/workflows/cron-staging.yml b/.github/workflows/cron-staging.yml index b6b39ce7d..2542107c8 100644 --- a/.github/workflows/cron-staging.yml +++ b/.github/workflows/cron-staging.yml @@ -39,27 +39,3 @@ jobs: pip install -U -c constraints.txt -r requirements-dev.txt - name: Run Tests run: make runtime_integration - experiment-integration: - name: experiment-integration - runs-on: macOS-latest - env: - QISKIT_IBM_STAGING_API_TOKEN: ${{ secrets.QISKIT_IBM_STAGING_API_TOKEN }} - QISKIT_IBM_STAGING_API_URL: ${{ secrets.QISKIT_IBM_STAGING_API_URL }} - QISKIT_IBM_STAGING_DEVICE: "ibmq_qasm_simulator" - QISKIT_IBM_USE_STAGING_CREDENTIALS: True - LOG_LEVEL: DEBUG - STREAM_LOG: True - QISKIT_IN_PARALLEL: True - steps: - - uses: actions/checkout@v2 - - name: Set up Python 3.8 - uses: actions/setup-python@v2 - with: - python-version: 3.8 - - name: Install Deps - run: | - python -m pip install --upgrade pip - pip install -c constraints.txt -e . - pip install -U -c constraints.txt -r requirements-dev.txt - - name: Run Tests - run: make experiment_integration diff --git a/Makefile b/Makefile index c56b2e23e..598c3a8f2 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ # that they have been altered from the originals. -.PHONY: lint style test mypy test1 test2 test3 runtime_integration experiment_integration +.PHONY: lint style test mypy test1 test2 test3 runtime_integration lint: pylint -rn qiskit_ibm test @@ -37,6 +37,3 @@ test3: runtime_integration: python -m unittest -v test/ibm/runtime/test_runtime_integration.py - -experiment_integration: - python -m unittest -v test/ibm/experiment/test_experiment_server_integration.py diff --git a/docs/apidocs/ibm-provider.rst b/docs/apidocs/ibm-provider.rst index 9ea1de20a..0964685c6 100644 --- a/docs/apidocs/ibm-provider.rst +++ b/docs/apidocs/ibm-provider.rst @@ -12,5 +12,4 @@ Qiskit IBM Quantum Provider API Reference ibm_jupyter ibm_utils ibm_random - ibm_experiment ibm_runtime diff --git a/docs/apidocs/ibm_experiment.rst b/docs/apidocs/ibm_experiment.rst deleted file mode 100644 index f25c38575..000000000 --- a/docs/apidocs/ibm_experiment.rst +++ /dev/null @@ -1,6 +0,0 @@ -.. _qiskit_ibm-experiment: - -.. automodule:: qiskit_ibm.experiment - :no-members: - :no-inherited-members: - :no-special-members: diff --git a/qiskit_ibm/api/clients/experiment.py b/qiskit_ibm/api/clients/experiment.py deleted file mode 100644 index 1f90eb6b5..000000000 --- a/qiskit_ibm/api/clients/experiment.py +++ /dev/null @@ -1,324 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Client for accessing IBM Quantum experiment services.""" - -import logging -from typing import List, Dict, Optional, Union - -from qiskit_ibm.credentials import Credentials - -from ..rest import Api -from ..session import RetrySession -from .base import BaseClient - -logger = logging.getLogger(__name__) - - -class ExperimentClient(BaseClient): - """Client for accessing IBM Quantum experiment services.""" - - def __init__( - self, - credentials: Credentials - ) -> None: - """ExperimentClient constructor. - - Args: - credentials: Account credentials. - """ - self._session = RetrySession(credentials.experiment_url, credentials.access_token, - **credentials.connection_parameters()) - self.base_api = Api(self._session) - - def experiments( - self, - limit: Optional[int], - marker: Optional[str], - backend_name: Optional[str], - experiment_type: Optional[str] = None, - start_time: Optional[List] = None, - device_components: Optional[List[str]] = None, - tags: Optional[List[str]] = None, - hub: Optional[str] = None, - group: Optional[str] = None, - project: Optional[str] = None, - exclude_public: Optional[bool] = False, - public_only: Optional[bool] = False, - exclude_mine: Optional[bool] = False, - mine_only: Optional[bool] = False, - parent_id: Optional[str] = None, - sort_by: Optional[str] = None - ) -> str: - """Retrieve experiments, with optional filtering. - - Args: - limit: Number of experiments to retrieve. - marker: Marker used to indicate where to start the next query. - backend_name: Name of the backend. - experiment_type: Experiment type. - start_time: A list of timestamps used to filter by experiment start time. - device_components: A list of device components used for filtering. - tags: Tags used for filtering. - hub: Filter by hub. - group: Filter by hub and group. - project: Filter by hub, group, and project. - exclude_public: Whether or not to exclude experiments with a public share level. - public_only: Whether or not to only return experiments with a public share level. - exclude_mine: Whether or not to exclude experiments where I am the owner. - mine_only: Whether or not to only return experiments where I am the owner. - parent_id: Filter by parent experiment ID. - sort_by: Sorting order. - - Returns: - A list of experiments and the marker, if applicable. - """ - resp = self.base_api.experiments( - limit=limit, - marker=marker, - backend_name=backend_name, - experiment_type=experiment_type, - start_time=start_time, - device_components=device_components, - tags=tags, - hub=hub, group=group, project=project, - exclude_public=exclude_public, public_only=public_only, - exclude_mine=exclude_mine, mine_only=mine_only, - parent_id=parent_id, - sort_by=sort_by) - return resp - - def experiment_get(self, experiment_id: str) -> str: - """Get a specific experiment. - - Args: - experiment_id: Experiment uuid. - - Returns: - Experiment data. - """ - return self.base_api.experiment(experiment_id).retrieve() - - def experiment_upload(self, data: str) -> Dict: - """Upload an experiment. - - Args: - data: Experiment data. - - Returns: - Experiment data. - """ - return self.base_api.experiment_upload(data) - - def experiment_update(self, experiment_id: str, new_data: str) -> Dict: - """Update an experiment. - - Args: - experiment_id: Experiment UUID. - new_data: New experiment data. - - Returns: - Experiment data. - """ - return self.base_api.experiment(experiment_id).update(new_data) - - def experiment_delete(self, experiment_id: str) -> Dict: - """Delete an experiment. - - Args: - experiment_id: Experiment UUID. - - Returns: - Experiment data. - """ - return self.base_api.experiment(experiment_id).delete() - - def experiment_plot_upload( - self, - experiment_id: str, - plot: Union[bytes, str], - plot_name: str, - sync_upload: bool = True - ) -> Dict: - """Upload an experiment plot. - - Args: - experiment_id: Experiment UUID. - plot: Plot file name or data to upload. - plot_name: Name of the plot. - sync_upload: By default the server will upload the plot file - to backend storage asynchronously. Set this to False to use - that behavior and not block the upload. - - Returns: - JSON response. - """ - return self.base_api.experiment( - experiment_id).upload_plot( - plot, plot_name, sync_upload=sync_upload) - - def experiment_plot_update( - self, - experiment_id: str, - plot: Union[bytes, str], - plot_name: str, - sync_upload: bool = True - ) -> Dict: - """Update an experiment plot. - - Args: - experiment_id: Experiment UUID. - plot: Plot file name or data to upload. - plot_name: Name of the plot. - sync_upload: By default the server will upload the plot file - to backend storage asynchronously. Set this to False to use - that behavior and not block the upload. - - Returns: - JSON response. - """ - return self.base_api.experiment_plot( - experiment_id, plot_name).update( - plot, sync_upload=sync_upload) - - def experiment_plot_get(self, experiment_id: str, plot_name: str) -> bytes: - """Retrieve an experiment plot. - - Args: - experiment_id: Experiment UUID. - plot_name: Name of the plot. - - Returns: - Retrieved experiment plot. - """ - return self.base_api.experiment_plot(experiment_id, plot_name).retrieve() - - def experiment_plot_delete(self, experiment_id: str, plot_file_name: str) -> None: - """Delete an experiment plot. - - Args: - experiment_id: Experiment UUID. - plot_file_name: Plot file name. - """ - self.base_api.experiment_plot(experiment_id, plot_file_name).delete() - - def experiment_devices(self) -> List: - """Return list of experiment devices. - - Returns: - A list of experiment devices. - """ - return self.base_api.experiment_devices()['devices'] - - def analysis_results( - self, - limit: Optional[int], - marker: Optional[str], - backend_name: Optional[str] = None, - device_components: Optional[List[str]] = None, - experiment_uuid: Optional[str] = None, - result_type: Optional[str] = None, - quality: Optional[Union[str, List[str]]] = None, - verified: Optional[bool] = None, - tags: Optional[List[str]] = None, - created_at: Optional[List] = None, - sort_by: Optional[str] = None - ) -> str: - """Return a list of analysis results. - - Args: - limit: Number of analysis results to retrieve. - marker: Marker used to indicate where to start the next query. - backend_name: Name of the backend. - device_components: A list of device components used for filtering. - experiment_uuid: Experiment UUID used for filtering. - result_type: Analysis result type used for filtering. - quality: Quality value used for filtering. - verified: Indicates whether this result has been verified. - tags: Filter by tags assigned to analysis results. - created_at: A list of timestamps used to filter by creation time. - sort_by: Indicates how the output should be sorted. - - Returns: - A list of analysis results and the marker, if applicable. - """ - resp = self.base_api.analysis_results( - limit=limit, - marker=marker, - backend_name=backend_name, - device_components=device_components, - experiment_uuid=experiment_uuid, - result_type=result_type, - quality=quality, - verified=verified, - tags=tags, - created_at=created_at, - sort_by=sort_by - ) - return resp - - def analysis_result_upload(self, result: str) -> Dict: - """Upload an analysis result. - - Args: - result: The analysis result to upload. - - Returns: - Analysis result data. - """ - return self.base_api.analysis_result_upload(result) - - def analysis_result_update(self, result_id: str, new_data: str) -> Dict: - """Update an analysis result. - - Args: - result_id: Analysis result ID. - new_data: New analysis result data. - - Returns: - Analysis result data. - """ - return self.base_api.analysis_result(result_id).update(new_data) - - def analysis_result_delete(self, result_id: str) -> Dict: - """Delete an analysis result. - - Args: - result_id: Analysis result ID. - - Returns: - Analysis result data. - """ - return self.base_api.analysis_result(result_id).delete() - - def analysis_result_get(self, result_id: str) -> str: - """Retrieve an analysis result. - - Args: - result_id: Analysis result ID. - - Returns: - Analysis result data. - """ - return self.base_api.analysis_result(result_id).get() - - def device_components(self, backend_name: Optional[str]) -> List[Dict]: - """Return device components for the backend. - - Args: - backend_name: Name of the backend. - - Returns: - A list of device components. - """ - resp = self.base_api.device_components(backend_name) - return resp['device_components'] diff --git a/qiskit_ibm/api/rest/experiment.py b/qiskit_ibm/api/rest/experiment.py deleted file mode 100644 index 44b4849d8..000000000 --- a/qiskit_ibm/api/rest/experiment.py +++ /dev/null @@ -1,173 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Experiment REST adapter.""" - -import logging -from typing import Dict, Union - -from .base import RestAdapterBase -from ..session import RetrySession - -logger = logging.getLogger(__name__) - - -class Experiment(RestAdapterBase): - """Rest adapter for experiment related endpoints.""" - - URL_MAP = { - 'self': '', - 'upload_plots': '/plots' - } - - def __init__(self, session: RetrySession, experiment_uuid: str, url_prefix: str = '') -> None: - """Experiment constructor. - - Args: - session: Session to be used in the adaptor. - experiment_uuid: UUID of the experiment. - url_prefix: URL prefix. - """ - super().__init__(session, '{}/experiments/{}'.format(url_prefix, experiment_uuid)) - - def retrieve(self) -> str: - """Retrieve the specific experiment. - - Returns: - Experiment data. - """ - url = self.get_url('self') - return self.session.get(url).text - - def update(self, experiment: str) -> Dict: - """Update the experiment. - - Args: - experiment: Experiment to update. - - Returns: - JSON response. - """ - url = self.get_url('self') - return self.session.put(url, data=experiment, headers=self._HEADER_JSON_CONTENT).json() - - def delete(self) -> Dict: - """Delete the experiment. - - Returns: - JSON response. - """ - url = self.get_url('self') - return self.session.delete(url).json() - - def upload_plot( - self, - plot: Union[bytes, str], - plot_name: str, - sync_upload: bool = True - ) -> Dict: - """Upload a plot for the experiment. - - Args: - plot: Plot file name or data to upload. - plot_name: Name of the plot. - sync_upload: By default the server will upload the plot file - to backend storage asynchronously. Set this to False to use - that behavior and not block the upload. - - Returns: - JSON response. - """ - url = self.get_url('upload_plots') - headers = { - 'x-sync-upload': str(sync_upload) - } - if isinstance(plot, str): - with open(plot, 'rb') as file: - data = {'plot': (plot_name, file)} - response = self.session.post(url, files=data, headers=headers).json() - else: - data = {'plot': (plot_name, plot)} # type: ignore[dict-item] - response = self.session.post(url, files=data, headers=headers).json() - - return response - - -class ExperimentPlot(RestAdapterBase): - """Rest adapter for experiment plot related endpoints.""" - - URL_MAP = { - 'self': '' - } - - def __init__( - self, - session: RetrySession, - experiment_uuid: str, - plot_name: str, - url_prefix: str = '') -> None: - """Experiment constructor. - - Args: - session: Session to be used in the adaptor. - experiment_uuid: UUID of the experiment. - plot_name: Name of the plot. - url_prefix: URL prefix. - """ - super().__init__(session, '{}/experiments/{}/plots/{}'.format( - url_prefix, experiment_uuid, plot_name)) - self.plot_name = plot_name - - def retrieve(self) -> bytes: - """Retrieve the specific experiment plot. - - Returns: - Plot content. - """ - url = self.get_url('self') - response = self.session.get(url) - return response.content - - def delete(self) -> None: - """Delete this experiment plot.""" - url = self.get_url('self') - self.session.delete(url) - - def update( - self, - plot: Union[bytes, str], - sync_upload: bool = True - ) -> Dict: - """Update an experiment plot. - - Args: - plot: Plot file name or data to upload. - sync_upload: By default the server will upload the plot file - to backend storage asynchronously. Set this to False to use - that behavior and not block the upload. - - Returns: - JSON response. - """ - url = self.get_url('self') - headers = { - 'x-sync-upload': str(sync_upload) - } - if isinstance(plot, str): - with open(plot, 'rb') as file: - data = {'plot': (self.plot_name, file)} - response = self.session.put(url, files=data, headers=headers).json() - else: - data = {'plot': (self.plot_name, plot)} # type: ignore[dict-item] - response = self.session.put(url, files=data, headers=headers).json() - - return response diff --git a/qiskit_ibm/api/rest/root.py b/qiskit_ibm/api/rest/root.py index 5b3f707e9..661d1194c 100644 --- a/qiskit_ibm/api/rest/root.py +++ b/qiskit_ibm/api/rest/root.py @@ -17,7 +17,6 @@ import json from .base import RestAdapterBase -from .experiment import Experiment, ExperimentPlot from .analysis_result import AnalysisResult logger = logging.getLogger(__name__) @@ -32,37 +31,11 @@ class Api(RestAdapterBase): 'hubs': '/Network', 'version': '/version', 'bookings': '/Network/bookings/v2', - 'experiments': '/experiments', 'experiment_devices': '/devices', 'analysis_results': '/analysis_results', 'device_components': '/device_components' } -# Function-specific rest adapters. - - def experiment(self, experiment_uuid: str) -> Experiment: - """Return an adapter for the experiment. - - Args: - experiment_uuid: UUID of the experiment. - - Returns: - The experiment adapter. - """ - return Experiment(self.session, experiment_uuid) - - def experiment_plot(self, experiment_uuid: str, plot_name: str) -> ExperimentPlot: - """ - - Args: - experiment_uuid: UUID of the experiment. - plot_name: Name of the experiment plot. - - Returns: - The experiment plot adapter. - """ - return ExperimentPlot(self.session, experiment_uuid, plot_name) - def analysis_result(self, analysis_result_id: str) -> AnalysisResult: """Return an adapter for the analysis result. @@ -144,111 +117,6 @@ def reservations(self) -> List: url = self.get_url('bookings') return self.session.get(url).json() - # Experiment-related public functions. - - def experiments( - self, - limit: Optional[int], - marker: Optional[str], - backend_name: Optional[str] = None, - experiment_type: Optional[str] = None, - start_time: Optional[List] = None, - device_components: Optional[List[str]] = None, - tags: Optional[List[str]] = None, - hub: Optional[str] = None, - group: Optional[str] = None, - project: Optional[str] = None, - exclude_public: Optional[bool] = False, - public_only: Optional[bool] = False, - exclude_mine: Optional[bool] = False, - mine_only: Optional[bool] = False, - parent_id: Optional[str] = None, - sort_by: Optional[str] = None - ) -> str: - """Return experiment data. - - Args: - limit: Number of experiments to retrieve. - marker: Marker used to indicate where to start the next query. - backend_name: Name of the backend. - experiment_type: Experiment type. - start_time: A list of timestamps used to filter by experiment start time. - device_components: A list of device components used for filtering. - tags: Tags used for filtering. - hub: Filter by hub. - group: Filter by hub and group. - project: Filter by hub, group, and project. - exclude_public: Whether or not to exclude experiments with a public share level. - public_only: Whether or not to only return experiments with a public share level. - exclude_mine: Whether or not to exclude experiments where I am the owner. - mine_only: Whether or not to only return experiments where I am the owner. - parent_id: Filter by parent experiment ID. - sort_by: Sorting order. - - Returns: - Response text. - """ - url = self.get_url('experiments') - params = {} # type: Dict[str, Any] - if backend_name: - params['device_name'] = backend_name - if experiment_type: - params['type'] = experiment_type - if start_time: - params['start_time'] = start_time - if device_components: - params['device_components'] = device_components - if tags: - params['tags'] = tags - if limit: - params['limit'] = limit - if marker: - params['marker'] = marker - if hub: - params['hub_id'] = hub - if group: - params['group_id'] = group - if project: - params['project_id'] = project - if parent_id: - params['parent_experiment_uuid'] = parent_id - if exclude_public: - params['visibility'] = '!public' - elif public_only: - params['visibility'] = 'public' - if exclude_mine: - params['owner'] = '!me' - elif mine_only: - params['owner'] = 'me' - if sort_by: - params['sort'] = sort_by - - return self.session.get(url, params=params).text - - def experiment_devices(self) -> Dict: - """Return experiment devices. - - Returns: - JSON response. - """ - url = self.get_url('experiment_devices') - raw_data = self.session.get(url).json() - return raw_data - - def experiment_upload(self, experiment: str) -> Dict: - """Upload an experiment. - - Args: - experiment: The experiment data to upload. - - Returns: - JSON response. - """ - url = self.get_url('experiments') - raw_data = self.session.post(url, data=experiment, - headers=self._HEADER_JSON_CONTENT).json() - return raw_data - def analysis_results( self, limit: Optional[int], diff --git a/qiskit_ibm/experiment/__init__.py b/qiskit_ibm/experiment/__init__.py deleted file mode 100644 index 973259176..000000000 --- a/qiskit_ibm/experiment/__init__.py +++ /dev/null @@ -1,65 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -""" -==================================================== -Experiment (:mod:`qiskit_ibm.experiment`) -==================================================== - -.. currentmodule:: qiskit_ibm.experiment - -Modules related to IBM Quantum experiment service. - -.. note:: - - This service is not available to all accounts. - -You can use the experiment service to query, upload, and retrieve -experiments, experiment figures, and analysis results. For example:: - - from qiskit_ibm import IBMProvider - provider = IBMProvider() - experiments = provider.experiment.experiments() - -All the available functions can be invoked using the `provider.experiment` -attribute, which is an instance of the -:class:`~qiskit_ibm.experiment.ExperimentService` class. - -This service is intended to be used in conjunction with the ``qiskit-experiments`` -package, which allows you to create different types of experiments (for example, -:class:`qiskit_experiments.library.characterization.T1`). - -Classes -======= - -.. autosummary:: - :toctree: ../stubs/ - - IBMExperimentService - ResultQuality - DeviceComponent - -Exceptions -========== - -.. autosummary:: - :toctree: ../stubs/ - - IBMExperimentError - IBMExperimentEntryExists - IBMExperimentEntryNotFound -""" - -from .ibm_experiment_service import IBMExperimentService -from .constants import ResultQuality -from .device_component import DeviceComponent -from .exceptions import IBMExperimentError, IBMExperimentEntryExists, IBMExperimentEntryNotFound diff --git a/qiskit_ibm/experiment/constants.py b/qiskit_ibm/experiment/constants.py deleted file mode 100644 index beb8b60b6..000000000 --- a/qiskit_ibm/experiment/constants.py +++ /dev/null @@ -1,47 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Experiment constants.""" - -import enum - - -class ExperimentShareLevel(enum.Enum): - """Possible values for experiment share level (visibility).""" - - PRIVATE = 'private' # The experiment is only visible to its owner - PROJECT = 'project' # The experiment is shared within its project - GROUP = 'group' # The experiment is shared within its group - HUB = 'hub' # The experiment is shared within its hub - PUBLIC = 'public' # The experiment is shared publicly regardless of provider - - -class ResultQuality(enum.Enum): - """Possible values for analysis result quality.""" - - BAD = "BAD" - GOOD = "GOOD" - UNKNOWN = "UNKNOWN" - - -RESULT_QUALITY_FROM_API = { - "Good": ResultQuality.GOOD, - "Bad": ResultQuality.BAD, - "No Information": ResultQuality.UNKNOWN -} - - -RESULT_QUALITY_TO_API = { - ResultQuality.GOOD: "Good", - ResultQuality.BAD: "Bad", - ResultQuality.UNKNOWN: "No Information", -} diff --git a/qiskit_ibm/experiment/device_component.py b/qiskit_ibm/experiment/device_component.py deleted file mode 100644 index f5eb8b1fd..000000000 --- a/qiskit_ibm/experiment/device_component.py +++ /dev/null @@ -1,76 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Device component classes.""" - -from abc import ABC, abstractmethod - - -class DeviceComponent(ABC): - """Class representing a device component.""" - - @abstractmethod - def __str__(self) -> str: - pass - - def __repr__(self) -> str: - return f"<{self.__class__.__name__}({str(self)})>" - - -class Qubit(DeviceComponent): - """Class representing a qubit device component.""" - - def __init__(self, index: int) -> None: - self._index = index - - def __str__(self) -> str: - return f"Q{self._index}" - - -class Resonator(DeviceComponent): - """Class representing a resonator device component.""" - - def __init__(self, index: int) -> None: - self._index = index - - def __str__(self) -> str: - return f"R{self._index}" - - -class UnknownComponent(DeviceComponent): - """Class representing unknown device component.""" - - def __init__(self, component: str) -> None: - self._component = component - - def __str__(self) -> str: - return self._component - - -def to_component(string: str) -> DeviceComponent: - """Convert the input string to a ``DeviceComponent`` instance. - - Args: - string: String to be converted. - - Returns: - A ``DeviceComponent`` instance. - - Raises: - ValueError: If input string is not a valid device component. - """ - if string.startswith("Q"): - return Qubit(int(string[1:])) - elif string.startswith("R"): - return Resonator(int(string[1:])) - else: - return UnknownComponent(string) diff --git a/qiskit_ibm/experiment/exceptions.py b/qiskit_ibm/experiment/exceptions.py deleted file mode 100644 index b50d72261..000000000 --- a/qiskit_ibm/experiment/exceptions.py +++ /dev/null @@ -1,28 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Exceptions related to IBM Quantum experiments.""" - -from ..exceptions import IBMError - - -class IBMExperimentError(IBMError): - """Base class for errors raised by the experiment service modules.""" - pass - - -class IBMExperimentEntryNotFound(IBMExperimentError): - """Errors raised when an experiment entry cannot be found.""" - - -class IBMExperimentEntryExists(IBMExperimentError): - """Errors raised when an experiment entry already exists.""" diff --git a/qiskit_ibm/experiment/ibm_experiment_service.py b/qiskit_ibm/experiment/ibm_experiment_service.py deleted file mode 100644 index 0932cd508..000000000 --- a/qiskit_ibm/experiment/ibm_experiment_service.py +++ /dev/null @@ -1,1298 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""IBM Quantum experiment service.""" - -import logging -import json -import copy -from typing import Optional, List, Dict, Union, Tuple, Any, Type -from datetime import datetime -from collections import defaultdict - -from qiskit.providers.exceptions import QiskitBackendNotFoundError - -from qiskit_ibm import ibm_provider # pylint: disable=unused-import - -from .constants import (ExperimentShareLevel, ResultQuality, - RESULT_QUALITY_FROM_API, RESULT_QUALITY_TO_API) -from .utils import map_api_error -from .device_component import DeviceComponent -from ..utils.converters import local_to_utc_str, utc_to_local -from ..api.clients.experiment import ExperimentClient -from ..api.exceptions import RequestsApiError -from ..ibm_backend import IBMRetiredBackend -from ..exceptions import IBMApiError -from ..credentials import store_preferences -from ..hub_group_project import HubGroupProject - -logger = logging.getLogger(__name__) - -SERVICE_NAME = 'experiment' - - -class IBMExperimentService: - """Provides experiment related services. - - This class is the main interface to invoke IBM Quantum - experiment service, which allows you to create, delete, update, query, and - retrieve experiments, experiment figures, and analysis results. The - ``experiment`` attribute of - :class:`~qiskit_ibm.ibm_provider.IBMProvider` is an - instance of this class, and the main syntax for using the service is - ``provider.experiment.``. For example:: - - from qiskit_ibm import IBMProvider - provider = IBMProvider() - - # Retrieve all experiments. - experiments = provider.experiment.experiments() - - # Retrieve experiments with filtering. - experiment_filtered = provider.experiment.experiments(backend_name='ibmq_athens') - - # Retrieve a specific experiment using its ID. - experiment = provider.experiment.experiment(EXPERIMENT_ID) - - # Upload a new experiment. - new_experiment_id = provider.experiment.create_experiment( - experiment_type="T1", - backend_name="ibmq_athens", - metadata={"qubits": 5} - ) - - # Update an experiment. - provider.experiment.update_experiment( - experiment_id=EXPERIMENT_ID, - share_level="Group" - ) - - # Delete an experiment. - provider.experiment.delete_experiment(EXPERIMENT_ID) - - Similar syntax applies to analysis results and experiment figures. - """ - - _default_preferences = {"auto_save": False} - - def __init__( - self, - provider: 'ibm_provider.IBMProvider', - hgp: HubGroupProject - ) -> None: - """IBMExperimentService constructor. - - Args: - provider: IBM Quantum account provider. - hgp: default hub/group/project to use for the service. - """ - super().__init__() - - self._provider = provider - self._default_hgp = hgp - self._api_client = ExperimentClient(hgp.credentials) - self._preferences = copy.deepcopy(self._default_preferences) - self._preferences.update(hgp.credentials.preferences.get('experiments', {})) - - def backends(self) -> List[Dict]: - """Return a list of backends that can be used for experiments. - - Returns: - A list of backends. - """ - return self._api_client.experiment_devices() - - def create_experiment( - self, - experiment_type: str, - backend_name: str, - metadata: Optional[Dict] = None, - experiment_id: Optional[str] = None, - parent_id: Optional[str] = None, - job_ids: Optional[List[str]] = None, - tags: Optional[List[str]] = None, - notes: Optional[str] = None, - share_level: Optional[Union[str, ExperimentShareLevel]] = None, - start_datetime: Optional[Union[str, datetime]] = None, - json_encoder: Type[json.JSONEncoder] = json.JSONEncoder, - hub: Optional[str] = None, - group: Optional[str] = None, - project: Optional[str] = None, - **kwargs: Any - ) -> str: - """Create a new experiment in the database. - - Args: - experiment_type: Experiment type. - backend_name: Name of the backend the experiment ran on. - metadata: Experiment metadata. - experiment_id: Experiment ID. It must be in the ``uuid4`` format. - One will be generated if not supplied. - parent_id: The experiment ID of the parent experiment. - The parent experiment must exist, must be on the same backend as the child, - and an experiment cannot be its own parent. - job_ids: IDs of experiment jobs. - tags: Tags to be associated with the experiment. - notes: Freeform notes about the experiment. - share_level: The level at which the experiment is shared. This determines who can - view the experiment (but not update it). This defaults to "private" - for new experiments. Possible values include: - - - private: The experiment is only visible to its owner (default) - - project: The experiment is shared within its project - - group: The experiment is shared within its group - - hub: The experiment is shared within its hub - - public: The experiment is shared publicly regardless of provider - start_datetime: Timestamp when the experiment started, in local time zone. - json_encoder: Custom JSON encoder to use to encode the experiment. - hub: Name of the hub. - group: Name of the group. - project: Name of the project. - kwargs: Additional experiment attributes that are not supported and will be ignored. - - Returns: - Experiment ID. - - Raises: - IBMExperimentEntryExists: If the experiment already exits. - IBMApiError: If the request to the server failed. - """ - # pylint: disable=arguments-differ - if kwargs: - logger.info("Keywords %s are not supported by IBM Quantum experiment service " - "and will be ignored.", - kwargs.keys()) - if not all([hub, group, project]) and self._default_hgp.get_backend(backend_name): - hgp = self._default_hgp - else: - hgp = self._provider._get_hgp(hub=hub, group=group, project=project, - backend_name=backend_name, service_name=SERVICE_NAME) - credentials = hgp.credentials - api_client = self._api_client if hgp == self._default_hgp else ExperimentClient(credentials) - data = { - 'type': experiment_type, - 'device_name': backend_name, - 'hub_id': credentials.hub, - 'group_id': credentials.group, - 'project_id': credentials.project - } - data.update(self._experiment_data_to_api(metadata=metadata, - experiment_id=experiment_id, - parent_id=parent_id, - job_ids=job_ids, - tags=tags, - notes=notes, - share_level=share_level, - start_dt=start_datetime)) - - with map_api_error(f"Experiment {experiment_id} already exists."): - response_data = api_client.experiment_upload(json.dumps(data, cls=json_encoder)) - return response_data['uuid'] - - def update_experiment( - self, - experiment_id: str, - metadata: Optional[Dict] = None, - job_ids: Optional[List[str]] = None, - notes: Optional[str] = None, - tags: Optional[List[str]] = None, - share_level: Optional[Union[str, ExperimentShareLevel]] = None, - end_datetime: Optional[Union[str, datetime]] = None, - json_encoder: Type[json.JSONEncoder] = json.JSONEncoder, - **kwargs: Any, - ) -> None: - """Update an existing experiment. - - Args: - experiment_id: Experiment ID. - metadata: Experiment metadata. - job_ids: IDs of experiment jobs. - notes: Freeform notes about the experiment. - tags: Tags to be associated with the experiment. - share_level: The level at which the experiment is shared. This determines who can - view the experiment (but not update it). This defaults to "private" - for new experiments. Possible values include: - - - private: The experiment is only visible to its owner (default) - - project: The experiment is shared within its project - - group: The experiment is shared within its group - - hub: The experiment is shared within its hub - - public: The experiment is shared publicly regardless of provider - - end_datetime: Timestamp for when the experiment ended, in local time. - json_encoder: Custom JSON encoder to use to encode the experiment. - kwargs: Additional experiment attributes that are not supported and will be ignored. - - Raises: - IBMExperimentEntryNotFound: If the experiment does not exist. - IBMApiError: If the request to the server failed. - """ - # pylint: disable=arguments-differ - if kwargs: - logger.info("Keywords %s are not supported by IBM Quantum experiment service " - "and will be ignored.", - kwargs.keys()) - - data = self._experiment_data_to_api(metadata=metadata, - job_ids=job_ids, - tags=tags, - notes=notes, - share_level=share_level, - end_dt=end_datetime) - if not data: - logger.warning("update_experiment() called with nothing to update.") - return - - with map_api_error(f"Experiment {experiment_id} not found."): - self._api_client.experiment_update(experiment_id, json.dumps(data, cls=json_encoder)) - - def _experiment_data_to_api( - self, - metadata: Optional[Dict] = None, - experiment_id: Optional[str] = None, - parent_id: Optional[str] = None, - job_ids: Optional[List[str]] = None, - tags: Optional[List[str]] = None, - notes: Optional[str] = None, - share_level: Optional[Union[str, ExperimentShareLevel]] = None, - start_dt: Optional[Union[str, datetime]] = None, - end_dt: Optional[Union[str, datetime]] = None, - ) -> Dict: - """Convert experiment data to API request data. - - Args: - metadata: Experiment metadata. - experiment_id: Experiment ID. - parent_id: Parent experiment ID - job_ids: IDs of experiment jobs. - tags: Tags to be associated with the experiment. - notes: Freeform notes about the experiment. - share_level: The level at which the experiment is shared. - start_dt: Experiment start time. - end_dt: Experiment end time. - - Returns: - API request data. - """ - data = {} # type: Dict[str, Any] - if metadata: - data['extra'] = metadata - if experiment_id: - data['uuid'] = experiment_id - if parent_id: - data['parent_experiment_uuid'] = parent_id - if share_level: - if isinstance(share_level, str): - share_level = ExperimentShareLevel(share_level.lower()) - data['visibility'] = share_level.value - if tags: - data['tags'] = tags - if job_ids: - data['jobs'] = job_ids - if notes: - data['notes'] = notes - if start_dt: - data['start_time'] = local_to_utc_str(start_dt) - if end_dt: - data['end_time'] = local_to_utc_str(end_dt) - return data - - def experiment( - self, - experiment_id: str, - json_decoder: Type[json.JSONDecoder] = json.JSONDecoder - ) -> Dict: - """Retrieve a previously stored experiment. - - Args: - experiment_id: Experiment ID. - json_decoder: Custom JSON decoder to use to decode the retrieved experiment. - - Returns: - Retrieved experiment data. - - Raises: - IBMExperimentEntryNotFound: If the experiment does not exist. - IBMApiError: If the request to the server failed. - """ - with map_api_error(f"Experiment {experiment_id} not found."): - raw_data = self._api_client.experiment_get(experiment_id) - - return self._api_to_experiment_data(json.loads(raw_data, cls=json_decoder)) - - def experiments( - self, - limit: Optional[int] = 10, - json_decoder: Type[json.JSONDecoder] = json.JSONDecoder, - device_components: Optional[List[Union[str, DeviceComponent]]] = None, - device_components_operator: Optional[str] = None, - experiment_type: Optional[str] = None, - experiment_type_operator: Optional[str] = None, - backend_name: Optional[str] = None, - tags: Optional[List[str]] = None, - tags_operator: Optional[str] = "OR", - start_datetime_after: Optional[datetime] = None, - start_datetime_before: Optional[datetime] = None, - hub: Optional[str] = None, - group: Optional[str] = None, - project: Optional[str] = None, - exclude_public: Optional[bool] = False, - public_only: Optional[bool] = False, - exclude_mine: Optional[bool] = False, - mine_only: Optional[bool] = False, - parent_id: Optional[str] = None, - sort_by: Optional[Union[str, List[str]]] = None, - **filters: Any - ) -> List[Dict]: - """Retrieve all experiments, with optional filtering. - - By default, results returned are as inclusive as possible. For example, - if you don't specify any filters, all experiments visible to you - are returned. This includes your own experiments as well as - those shared with you, from all providers you have access to - (not just from the provider you used to invoke this experiment service). - - Args: - limit: Number of experiments to retrieve. ``None`` indicates no limit. - json_decoder: Custom JSON decoder to use to decode the retrieved experiments. - device_components: Filter by device components. - device_components_operator: Operator used when filtering by device components. - Valid values are ``None`` and "contains": - - * If ``None``, an analysis result's device components must match - exactly for it to be included. - * If "contains" is specified, an analysis result's device components - must contain at least the values specified by the `device_components` - filter. - - experiment_type: Experiment type used for filtering. - experiment_type_operator: Operator used when filtering by experiment type. - Valid values are ``None`` and "like": - - * If ``None`` is specified, an experiment's type value must - match exactly for it to be included. - * If "like" is specified, an experiment's type value must - contain the value specified by `experiment_type`. For example, - ``experiment_type="foo", experiment_type_operator="like"`` will match - both ``foo1`` and ``1foo``. - backend_name: Backend name used for filtering. - tags: Filter by tags assigned to experiments. - tags_operator: Logical operator to use when filtering by job tags. Valid - values are "AND" and "OR": - - * If "AND" is specified, then an experiment must have all of the tags - specified in `tags` to be included. - * If "OR" is specified, then an experiment only needs to have any - of the tags specified in `tags` to be included. - - start_datetime_after: Filter by the given start timestamp, in local time. - This is used to find experiments whose start date/time is after - (greater than or equal to) this local timestamp. - start_datetime_before: Filter by the given start timestamp, in local time. - This is used to find experiments whose start date/time is before - (less than or equal to) this local timestamp. - hub: Filter by hub. - group: Filter by hub and group. `hub` must also be specified if `group` is. - project: Filter by hub, group, and project. `hub` and `group` must also be - specified if `project` is. - exclude_public: If ``True``, experiments with ``share_level=public`` - (that is, experiments visible to all users) will not be returned. - Cannot be ``True`` if `public_only` is ``True``. - public_only: If ``True``, only experiments with ``share_level=public`` - (that is, experiments visible to all users) will be returned. - Cannot be ``True`` if `exclude_public` is ``True``. - exclude_mine: If ``True``, experiments where I am the owner will not be returned. - Cannot be ``True`` if `mine_only` is ``True``. - mine_only: If ``True``, only experiments where I am the owner will be returned. - Cannot be ``True`` if `exclude_mine` is ``True``. - parent_id: Filter experiments by this parent experiment ID. - sort_by: Specifies how the output should be sorted. This can be a single sorting - option or a list of options. Each option should contain a sort key - and a direction, separated by a semicolon. Valid sort keys are - "start_datetime" and "experiment_type". - Valid directions are "asc" for ascending or "desc" for descending. - For example, ``sort_by=["experiment_type:asc", "start_datetime:desc"]`` will - return an output list that is first sorted by experiment type in - ascending order, then by start datetime by descending order. - By default, experiments are sorted by ``start_datetime`` - descending and ``experiment_id`` ascending. - **filters: Additional filtering keywords that are not supported and will be ignored. - - Returns: - A list of experiments. Each experiment is a dictionary containing the - retrieved experiment data. - - Raises: - ValueError: If an invalid parameter value is specified. - IBMApiError: If the request to the server failed. - """ - # pylint: disable=arguments-differ - if filters: - logger.info("Keywords %s are not supported by IBM Quantum experiment service " - "and will be ignored.", - filters.keys()) - - if limit is not None and (not isinstance(limit, int) or limit <= 0): # type: ignore - raise ValueError(f"{limit} is not a valid `limit`, which has to be a positive integer.") - - pgh_text = ['project', 'group', 'hub'] - pgh_val = [project, group, hub] - for idx, val in enumerate(pgh_val): - if val is not None and None in pgh_val[idx+1:]: - raise ValueError(f"If {pgh_text[idx]} is specified, " - f"{' and '.join(pgh_text[idx+1:])} must also be specified.") - - start_time_filters = [] - if start_datetime_after: - st_filter = 'ge:{}'.format(local_to_utc_str(start_datetime_after)) - start_time_filters.append(st_filter) - if start_datetime_before: - st_filter = 'le:{}'.format(local_to_utc_str(start_datetime_before)) - start_time_filters.append(st_filter) - - if exclude_public and public_only: - raise ValueError('exclude_public and public_only cannot both be True') - - if exclude_mine and mine_only: - raise ValueError('exclude_mine and mine_only cannot both be True') - - converted = self._filtering_to_api( - tags=tags, - tags_operator=tags_operator, - sort_by=sort_by, - sort_map={"start_datetime": "start_time", - "experiment_type": "type"}, - device_components=device_components, - device_components_operator=device_components_operator, - item_type=experiment_type, - item_type_operator=experiment_type_operator - ) - - experiments = [] - marker = None - while limit is None or limit > 0: - with map_api_error(f"Request failed."): - response = self._api_client.experiments( - limit=limit, - marker=marker, - backend_name=backend_name, - experiment_type=converted["type"], - start_time=start_time_filters, - device_components=converted["device_components"], - tags=converted["tags"], - hub=hub, group=group, project=project, - exclude_public=exclude_public, - public_only=public_only, - exclude_mine=exclude_mine, - mine_only=mine_only, - parent_id=parent_id, - sort_by=converted["sort_by"]) - raw_data = json.loads(response, cls=json_decoder) - marker = raw_data.get('marker') - for exp in raw_data['experiments']: - experiments.append(self._api_to_experiment_data(exp)) - if limit: - limit -= len(raw_data['experiments']) - if not marker: # No more experiments to return. - break - return experiments - - def _api_to_experiment_data( - self, - raw_data: Dict, - ) -> Dict: - """Convert API response to experiment data. - - Args: - raw_data: API response - - Returns: - Converted experiment data. - """ - backend_name = raw_data['device_name'] - try: - backend = self._provider.get_backend(backend_name) - except QiskitBackendNotFoundError: - backend = IBMRetiredBackend.from_name(backend_name=backend_name, - provider=self._provider, - credentials=self._default_hgp.credentials, - api=None) - extra_data: Dict[str, Any] = {} - self._convert_dt(raw_data.get('created_at', None), extra_data, 'creation_datetime') - self._convert_dt(raw_data.get('start_time', None), extra_data, 'start_datetime') - self._convert_dt(raw_data.get('end_time', None), extra_data, 'end_datetime') - self._convert_dt(raw_data.get('updated_at', None), extra_data, 'updated_datetime') - - out_dict = { - "experiment_type": raw_data['type'], - "backend": backend, - "experiment_id": raw_data['uuid'], - "parent_id": raw_data.get('parent_experiment_uuid', None), - "tags": raw_data.get("tags", None), - "job_ids": raw_data['jobs'], - "share_level": raw_data.get("visibility", None), - "metadata": raw_data.get("extra", None), - "figure_names": raw_data.get("plot_names", None), - "notes": raw_data.get("notes", ""), - "hub": raw_data.get("hub_id", ""), - "group": raw_data.get("group_id", ""), - "project": raw_data.get("project_id", ""), - "owner": raw_data.get("owner", ""), - **extra_data - } - return out_dict - - def _convert_dt( - self, - timestamp: Optional[str], - data: Dict, - field_name: str - ) -> None: - """Convert input timestamp. - - Args: - timestamp: Timestamp to be converted. - data: Data used to stored the converted timestamp. - field_name: Name used to store the converted timestamp. - """ - if not timestamp: - return - data[field_name] = utc_to_local(timestamp) - - def delete_experiment(self, experiment_id: str) -> None: - """Delete an experiment. - - Args: - experiment_id: Experiment ID. - - Note: - This method prompts for confirmation and requires a response before proceeding. - - Raises: - IBMApiError: If the request to the server failed. - """ - confirmation = input('\nAre you sure you want to delete the experiment? ' - 'Results and plots for the experiment will also be deleted. [y/N]: ') - if confirmation not in ('y', 'Y'): - return - - try: - self._api_client.experiment_delete(experiment_id) - except RequestsApiError as api_err: - if api_err.status_code == 404: - logger.warning("Experiment %s not found.", experiment_id) - else: - raise IBMApiError(f"Failed to process the request: {api_err}") from None - - def create_analysis_result( - self, - experiment_id: str, - result_data: Dict, - result_type: str, - device_components: Optional[Union[List[Union[str, DeviceComponent]], - str, DeviceComponent]] = None, - tags: Optional[List[str]] = None, - quality: Union[ResultQuality, str] = ResultQuality.UNKNOWN, - verified: bool = False, - result_id: Optional[str] = None, - chisq: Optional[float] = None, - json_encoder: Type[json.JSONEncoder] = json.JSONEncoder, - **kwargs: Any, - ) -> str: - """Create a new analysis result in the database. - - Args: - experiment_id: ID of the experiment this result is for. - result_data: Result data to be stored. - result_type: Analysis result type. - device_components: Target device components, such as qubits. - tags: Tags to be associated with the analysis result. - quality: Quality of this analysis. - verified: Whether the result quality has been verified. - result_id: Analysis result ID. It must be in the ``uuid4`` format. - One will be generated if not supplied. - chisq: chi^2 decimal value of the fit. - json_encoder: Custom JSON encoder to use to encode the analysis result. - kwargs: Additional analysis result attributes that are not supported - and will be ignored. - - Returns: - Analysis result ID. - - Raises: - IBMExperimentEntryExists: If the analysis result already exits. - IBMApiError: If the request to the server failed. - """ - # pylint: disable=arguments-differ - if kwargs: - logger.info("Keywords %s are not supported by IBM Quantum experiment service " - "and will be ignored.", - kwargs.keys()) - - components = [] - if device_components: - if not isinstance(device_components, list): - device_components = [device_components] - for comp in device_components: - components.append(str(comp)) - - if isinstance(quality, str): - quality = ResultQuality(quality.upper()) - - request = self._analysis_result_to_api( - experiment_id=experiment_id, - device_components=components, - data=result_data, - result_type=result_type, - tags=tags, - quality=quality, - verified=verified, - result_id=result_id, - chisq=chisq - ) - with map_api_error(f"Analysis result {result_id} already exists."): - response = self._api_client.analysis_result_upload( - json.dumps(request, cls=json_encoder)) - return response['uuid'] - - def update_analysis_result( - self, - result_id: str, - result_data: Optional[Dict] = None, - tags: Optional[List[str]] = None, - quality: Union[ResultQuality, str] = None, - verified: bool = None, - chisq: Optional[float] = None, - json_encoder: Type[json.JSONEncoder] = json.JSONEncoder, - **kwargs: Any, - ) -> None: - """Update an existing analysis result. - - Args: - result_id: Analysis result ID. - result_data: Result data to be stored. - quality: Quality of this analysis. - verified: Whether the result quality has been verified. - tags: Tags to be associated with the analysis result. - chisq: chi^2 decimal value of the fit. - json_encoder: Custom JSON encoder to use to encode the analysis result. - kwargs: Additional analysis result attributes that are not supported - and will be ignored. - - Raises: - IBMExperimentEntryNotFound: If the analysis result does not exist. - IBMApiError: If the request to the server failed. - """ - # pylint: disable=arguments-differ - if kwargs: - logger.info("Keywords %s are not supported by IBM Quantum experiment service " - "and will be ignored.", - kwargs.keys()) - - if isinstance(quality, str): - quality = ResultQuality(quality.upper()) - - request = self._analysis_result_to_api(data=result_data, - tags=tags, - quality=quality, - verified=verified, - chisq=chisq) - with map_api_error(f"Analysis result {result_id} not found."): - self._api_client.analysis_result_update( - result_id, json.dumps(request, cls=json_encoder)) - - def _analysis_result_to_api( - self, - experiment_id: Optional[str] = None, - device_components: Optional[List[str]] = None, - data: Optional[Dict] = None, - result_type: Optional[str] = None, - tags: Optional[List[str]] = None, - quality: Optional[ResultQuality] = None, - verified: Optional[bool] = None, - result_id: Optional[str] = None, - chisq: Optional[float] = None, - ) -> Dict: - """Convert analysis result fields to server format. - - Args: - experiment_id: ID of the experiment this result is for. - data: Result data to be stored. - result_type: Analysis result type. - device_components: Target device components, such as qubits. - tags: Tags to be associated with the analysis result. - quality: Quality of this analysis. - verified: Whether the result quality has been verified. - result_id: Analysis result ID. It must be in the ``uuid4`` format. - One will be generated if not supplied. - chisq: chi^2 decimal value of the fit. - - Returns: - API request data. - """ - out = {} # type: Dict[str, Any] - if experiment_id: - out["experiment_uuid"] = experiment_id - if device_components: - out["device_components"] = device_components - if data: - out["fit"] = data - if result_type: - out["type"] = result_type - if tags: - out["tags"] = tags - if quality: - out["quality"] = RESULT_QUALITY_TO_API[quality] - if verified is not None: - out["verified"] = verified - if result_id: - out["uuid"] = result_id - if chisq: - out["chisq"] = chisq - return out - - def analysis_result( - self, - result_id: str, - json_decoder: Type[json.JSONDecoder] = json.JSONDecoder - ) -> Dict: - """Retrieve a previously stored analysis result. - - Args: - result_id: Analysis result ID. - json_decoder: Custom JSON decoder to use to decode the retrieved analysis result. - - Returns: - Retrieved analysis result. - - Raises: - IBMExperimentEntryNotFound: If the analysis result does not exist. - IBMApiError: If the request to the server failed. - """ - with map_api_error(f"Analysis result {result_id} not found."): - raw_data = self._api_client.analysis_result_get(result_id) - - return self._api_to_analysis_result(json.loads(raw_data, cls=json_decoder)) - - def analysis_results( - self, - limit: Optional[int] = 10, - json_decoder: Type[json.JSONDecoder] = json.JSONDecoder, - device_components: Optional[List[Union[str, DeviceComponent]]] = None, - device_components_operator: Optional[str] = None, - experiment_id: Optional[str] = None, - result_type: Optional[str] = None, - result_type_operator: Optional[str] = None, - backend_name: Optional[str] = None, - quality: Optional[Union[List[Union[ResultQuality, str]], ResultQuality, str]] = None, - verified: Optional[bool] = None, - tags: Optional[List[str]] = None, - tags_operator: Optional[str] = "OR", - creation_datetime_after: Optional[datetime] = None, - creation_datetime_before: Optional[datetime] = None, - sort_by: Optional[Union[str, List[str]]] = None, - **filters: Any - ) -> List[Dict]: - """Retrieve all analysis results, with optional filtering. - - Args: - limit: Number of analysis results to retrieve. - json_decoder: Custom JSON decoder to use to decode the retrieved analysis results. - device_components: Filter by device components. - device_components_operator: Operator used when filtering by device components. - Valid values are ``None`` and "contains": - - * If ``None``, an analysis result's device components must match - exactly for it to be included. - * If "contains" is specified, an analysis result's device components - must contain at least the values specified by the `device_components` - filter. - - experiment_id: Experiment ID used for filtering. - result_type: Analysis result type used for filtering. - result_type_operator: Operator used when filtering by result type. - Valid values are ``None`` and "like": - - * If ``None`` is specified, an analysis result's type value must - match exactly for it to be included. - * If "like" is specified, an analysis result's type value must - contain the value specified by `result_type`. For example, - ``result_type="foo", result_type_operator="like"`` will match - both ``foo1`` and ``1foo``. - - backend_name: Backend name used for filtering. - quality: Quality value used for filtering. If a list is given, analysis results - whose quality value is in the list will be included. - verified: Indicates whether this result has been verified.. - tags: Filter by tags assigned to analysis results. This can be used - with `tags_operator` for granular filtering. - tags_operator: Logical operator to use when filtering by tags. Valid - values are "AND" and "OR": - - * If "AND" is specified, then an analysis result must have all of the tags - specified in `tags` to be included. - * If "OR" is specified, then an analysis result only needs to have any - of the tags specified in `tags` to be included. - - creation_datetime_after: Filter by the given creation timestamp, in local time. - This is used to find analysis results whose creation date/time is after - (greater than or equal to) this local timestamp. - creation_datetime_before: Filter by the given creation timestamp, in local time. - This is used to find analysis results whose creation date/time is before - (less than or equal to) this local timestamp. - sort_by: Specifies how the output should be sorted. This can be a single sorting - option or a list of options. Each option should contain a sort key - and a direction. Valid sort keys are "creation_datetime", "device_components", - and "result_type". Valid directions are "asc" for ascending or "desc" for - descending. - For example, ``sort_by=["result_type: asc", "creation_datetime:desc"]`` will - return an output list that is first sorted by result type in - ascending order, then by creation datetime by descending order. - By default, analysis results are sorted by ``creation_datetime`` - descending and ``result_id`` ascending. - - **filters: Additional filtering keywords that are not supported and will be ignored. - - Returns: - A list of analysis results. Each analysis result is a dictionary - containing the retrieved analysis result. - - Raises: - ValueError: If an invalid parameter value is specified. - IBMApiError: If the request to the server failed. - """ - # pylint: disable=arguments-differ - if filters: - logger.info("Keywords %s are not supported by IBM Quantum experiment service " - "and will be ignored.", - filters.keys()) - - if limit is not None and (not isinstance(limit, int) or limit <= 0): # type: ignore - raise ValueError(f"{limit} is not a valid `limit`, which has to be a positive integer.") - - quality = self._quality_filter_to_api(quality) - - created_at_filters = [] - if creation_datetime_after: - ca_filter = 'ge:{}'.format(local_to_utc_str(creation_datetime_after)) - created_at_filters.append(ca_filter) - if creation_datetime_before: - ca_filter = 'le:{}'.format(local_to_utc_str(creation_datetime_before)) - created_at_filters.append(ca_filter) - - converted = self._filtering_to_api( - tags=tags, - tags_operator=tags_operator, - sort_by=sort_by, - sort_map={"creation_datetime": "created_at", - "device_components": "device_components", - "result_type": "type"}, - device_components=device_components, - device_components_operator=device_components_operator, - item_type=result_type, - item_type_operator=result_type_operator - ) - - results = [] - marker = None - while limit is None or limit > 0: - with map_api_error("Request failed."): - response = self._api_client.analysis_results( - limit=limit, - marker=marker, - backend_name=backend_name, - device_components=converted["device_components"], - experiment_uuid=experiment_id, - result_type=converted["type"], - quality=quality, - verified=verified, - tags=converted["tags"], - created_at=created_at_filters, - sort_by=converted["sort_by"] - ) - raw_data = json.loads(response, cls=json_decoder) - marker = raw_data.get('marker') - for result in raw_data['analysis_results']: - results.append(self._api_to_analysis_result(result)) - if limit: - limit -= len(raw_data['analysis_results']) - if not marker: # No more experiments to return. - break - return results - - def _quality_filter_to_api( - self, - quality: Optional[Union[List[Union[ResultQuality, str]], ResultQuality, str]] = None, - ) -> Optional[Union[str, List[str]]]: - """Convert quality filter to server format.""" - if not quality: - return None - if not isinstance(quality, list): - quality = [quality] - - api_quals = [] - for qual in quality: - if isinstance(qual, str): - qual = ResultQuality(qual.upper()) - api_qual = RESULT_QUALITY_TO_API[qual] - if api_qual not in api_quals: - api_quals.append(api_qual) - - if len(api_quals) == 1: - return api_quals[0] - if len(api_quals) == len(ResultQuality): - return None - - return "in:" + ",".join(api_quals) - - def _filtering_to_api( - self, - tags: Optional[List[str]] = None, - tags_operator: Optional[str] = "OR", - sort_by: Optional[Union[str, List[str]]] = None, - sort_map: Optional[Dict] = None, - device_components: Optional[List[Union[str, DeviceComponent]]] = None, - device_components_operator: Optional[str] = None, - item_type: Optional[str] = None, - item_type_operator: Optional[str] = None, - ) -> Dict: - """Convert filtering inputs to server format. - - Args: - tags: Filtering by tags. - tags_operator: Tags operator. - sort_by: Specifies how the output should be sorted. - sort_map: Sort key to API key mapping. - device_components: Filter by device components. - device_components_operator: Device component operator. - item_type: Item type used for filtering. - item_type_operator: Operator used when filtering by type. - - Returns: - A dictionary of mapped filters. - - Raises: - ValueError: If an input key is invalid. - """ - tags_filter = None - if tags: - if tags_operator.upper() == 'OR': - tags_filter = 'any:' + ','.join(tags) - elif tags_operator.upper() == 'AND': - tags_filter = 'contains:' + ','.join(tags) - else: - raise ValueError('{} is not a valid `tags_operator`. Valid values are ' - '"AND" and "OR".'.format(tags_operator)) - - sort_list = [] - if sort_by: - if not isinstance(sort_by, list): - sort_by = [sort_by] - for sorter in sort_by: - key, direction = sorter.split(":") - key = key.lower() - if key not in sort_map: - raise ValueError(f'"{key}" is not a valid sort key. ' - f'Valid sort keys are {sort_map.keys()}') - key = sort_map[key] - if direction not in ["asc", "desc"]: - raise ValueError(f'"{direction}" is not a valid sorting direction.' - f'Valid directions are "asc" and "desc".') - sort_list.append(f"{key}:{direction}") - sort_by = ",".join(sort_list) - - if device_components: - device_components = [str(comp) for comp in device_components] - if device_components_operator: - if device_components_operator != "contains": - raise ValueError(f'{device_components_operator} is not a valid ' - f'device_components_operator value. Valid values ' - f'are ``None`` and "contains"') - device_components = \ - "contains:" + ','.join(device_components) # type: ignore - - if item_type and item_type_operator: - if item_type_operator != "like": - raise ValueError(f'"{item_type_operator}" is not a valid type operator value. ' - f'Valid values are ``None`` and "like".') - item_type = "like:" + item_type - - return {"tags": tags_filter, - "sort_by": sort_by, - "device_components": device_components, - "type": item_type} - - def _api_to_analysis_result( - self, - raw_data: Dict, - ) -> Dict: - """Map API response to a dictionary representing an analysis result. - - Args: - raw_data: API response data. - - Returns: - Converted analysis result data. - """ - extra_data = {} - - chisq = raw_data.get('chisq', None) - if chisq: - extra_data['chisq'] = chisq - - backend_name = raw_data['device_name'] - if backend_name: - extra_data['backend_name'] = backend_name - - quality = raw_data.get('quality', None) - if quality: - quality = RESULT_QUALITY_FROM_API[quality] - - self._convert_dt(raw_data.get('created_at', None), extra_data, 'creation_datetime') - self._convert_dt(raw_data.get('updated_at', None), extra_data, 'updated_datetime') - - out_dict = { - "result_data": raw_data.get('fit', {}), - "result_type": raw_data.get('type', None), - "device_components": raw_data.get('device_components', []), - "experiment_id": raw_data.get('experiment_uuid'), - "result_id": raw_data.get('uuid', None), - "quality": quality, - "verified": raw_data.get('verified', False), - "tags": raw_data.get('tags', []), - "service": self, - **extra_data - } - return out_dict - - def delete_analysis_result( - self, - result_id: str - ) -> None: - """Delete an analysis result. - - Args: - result_id: Analysis result ID. - - Note: - This method prompts for confirmation and requires a response before proceeding. - - Raises: - IBMApiError: If the request to the server failed. - """ - confirmation = input('\nAre you sure you want to delete the analysis result? [y/N]: ') - if confirmation not in ('y', 'Y'): - return - - try: - self._api_client.analysis_result_delete(result_id) - except RequestsApiError as api_err: - if api_err.status_code == 404: - logger.warning("Analysis result %s not found.", result_id) - else: - raise IBMApiError(f"Failed to process the request: {api_err}") from None - - def create_figure( - self, - experiment_id: str, - figure: Union[str, bytes], - figure_name: Optional[str] = None, - sync_upload: bool = True - ) -> Tuple[str, int]: - """Store a new figure in the database. - - Note: - Currently only SVG figures are supported. - - Args: - experiment_id: ID of the experiment this figure is for. - figure: Name of the figure file or figure data to store. - figure_name: Name of the figure. If ``None``, the figure file name, if - given, or a generated name is used. - sync_upload: If ``True``, the plot will be uploaded synchronously. - Otherwise the upload will be asynchronous. - - Returns: - A tuple of the name and size of the saved figure. - - Raises: - IBMExperimentEntryExists: If the figure already exits. - IBMApiError: If the request to the server failed. - """ - if figure_name is None: - if isinstance(figure, str): - figure_name = figure - else: - figure_name = "figure_{}.svg".format(datetime.now().isoformat()) - if not figure_name.endswith(".svg"): - figure_name += ".svg" - - with map_api_error(f"Figure {figure_name} already exists."): - response = self._api_client.experiment_plot_upload(experiment_id, figure, figure_name, - sync_upload=sync_upload) - return response['name'], response['size'] - - def update_figure( - self, - experiment_id: str, - figure: Union[str, bytes], - figure_name: str, - sync_upload: bool = True - ) -> Tuple[str, int]: - """Update an existing figure. - - Args: - experiment_id: Experiment ID. - figure: Name of the figure file or figure data to store. - figure_name: Name of the figure. - sync_upload: If ``True``, the plot will be uploaded synchronously. - Otherwise the upload will be asynchronous. - - Returns: - A tuple of the name and size of the saved figure. - - Raises: - IBMExperimentEntryNotFound: If the figure does not exist. - IBMApiError: If the request to the server failed. - """ - with map_api_error(f"Figure {figure_name} not found."): - response = self._api_client.experiment_plot_update(experiment_id, figure, figure_name, - sync_upload=sync_upload) - - return response['name'], response['size'] - - def figure( - self, - experiment_id: str, - figure_name: str, - file_name: Optional[str] = None - ) -> Union[int, bytes]: - """Retrieve an existing figure. - - Args: - experiment_id: Experiment ID. - figure_name: Name of the figure. - file_name: Name of the local file to save the figure to. If ``None``, - the content of the figure is returned instead. - - Returns: - The size of the figure if `file_name` is specified. Otherwise the - content of the figure in bytes. - - Raises: - IBMExperimentEntryNotFound: If the figure does not exist. - IBMApiError: If the request to the server failed. - """ - with map_api_error(f"Figure {figure_name} not found."): - data = self._api_client.experiment_plot_get(experiment_id, figure_name) - - if file_name: - with open(file_name, 'wb') as file: - num_bytes = file.write(data) - return num_bytes - - return data - - def delete_figure( - self, - experiment_id: str, - figure_name: str - ) -> None: - """Delete an experiment plot. - - Note: - This method prompts for confirmation and requires a response before proceeding. - - Args: - experiment_id: Experiment ID. - figure_name: Name of the figure. - - Raises: - IBMApiError: If the request to the server failed. - """ - confirmation = input('\nAre you sure you want to delete the experiment plot? [y/N]: ') - if confirmation not in ('y', 'Y'): - return - - try: - self._api_client.experiment_plot_delete(experiment_id, figure_name) - except RequestsApiError as api_err: - if api_err.status_code == 404: - logger.warning("Figure %s not found.", figure_name) - else: - raise IBMApiError(f"Failed to process the request: {api_err}") from None - - def device_components( - self, - backend_name: Optional[str] = None - ) -> Union[Dict[str, List], List]: - """Return the device components. - - Args: - backend_name: Name of the backend whose components are to be retrieved. - - Returns: - A list of device components if `backend_name` is specified. Otherwise - a dictionary whose keys are backend names the values - are lists of device components for the backends. - - Raises: - IBMApiError: If the request to the server failed. - """ - with map_api_error(f"No device components found for backend {backend_name}"): - raw_data = self._api_client.device_components(backend_name) - - components = defaultdict(list) - for data in raw_data: - components[data['device_name']].append(data['type']) - - if backend_name: - return components[backend_name] - - return dict(components) - - @property - def preferences(self) -> Dict: - """Return saved experiment preferences. - - Note: - These are preferences passed to the applications that use this service - and have no effect on the service itself. It is up to the application, - such as ``qiskit-experiments`` to implement the preferences. - - Returns: - Dict: The experiment preferences. - """ - return self._preferences - - def save_preferences(self, auto_save: bool = None) -> None: - """Stores experiment preferences on disk. - - Note: - These are preferences passed to the applications that use this service - and have no effect on the service itself. - - For example, if ``auto_save`` is set to ``True``, it tells the application, - such as ``qiskit-experiments``, that you prefer changes to be - automatically saved. It is up to the application to implement the preferences. - - Args: - auto_save: Automatically save the experiment. - """ - update_cred = False - if auto_save is not None and auto_save != self._preferences["auto_save"]: - self._preferences['auto_save'] = auto_save - update_cred = True - - if update_cred: - store_preferences( - {self._default_hgp.credentials.unique_id(): {'experiment': self.preferences}}) diff --git a/qiskit_ibm/experiment/utils.py b/qiskit_ibm/experiment/utils.py deleted file mode 100644 index e81e9e04d..000000000 --- a/qiskit_ibm/experiment/utils.py +++ /dev/null @@ -1,33 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Utilities for working with IBM Quantum experiments.""" - -from typing import Generator -from contextlib import contextmanager - -from .exceptions import IBMExperimentEntryNotFound, IBMExperimentEntryExists -from ..api.exceptions import RequestsApiError -from ..exceptions import IBMApiError - - -@contextmanager -def map_api_error(error_msg: str = "") -> Generator[None, None, None]: - """Convert an ``RequestsApiError`` to a user facing error.""" - try: - yield - except RequestsApiError as api_err: - if api_err.status_code == 409: - raise IBMExperimentEntryExists(error_msg + f" {api_err}") from None - if api_err.status_code == 404: - raise IBMExperimentEntryNotFound(error_msg + f" {api_err}") from None - raise IBMApiError(f"Failed to process the request: {api_err}") from None diff --git a/qiskit_ibm/ibm_provider.py b/qiskit_ibm/ibm_provider.py index f1f7e48e4..1afc3e68c 100644 --- a/qiskit_ibm/ibm_provider.py +++ b/qiskit_ibm/ibm_provider.py @@ -37,7 +37,6 @@ from .hub_group_project import HubGroupProject # pylint: disable=cyclic-import from .ibm_backend_service import IBMBackendService # pylint: disable=cyclic-import from .random.ibm_random_service import IBMRandomService # pylint: disable=cyclic-import -from .experiment import IBMExperimentService # pylint: disable=cyclic-import from .runtime.ibm_runtime_service import IBMRuntimeService # pylint: disable=cyclic-import from .exceptions import (IBMNotAuthorizedError, IBMInputValueError, IBMProviderCredentialsNotFound, IBMProviderCredentialsInvalidFormat, IBMProviderCredentialsInvalidToken, @@ -86,8 +85,7 @@ class IBMProvider(Provider): in decreasing order of priority. * The hub/group/project you explicity specify when calling a service. - Ex: `provider.get_backend()`, `provider.runtime.run()`, - `provider.experiment.create_experiment()`, etc. + Ex: `provider.get_backend()`, `provider.runtime.run()`, etc. * The hub/group/project required for the service. * The default hub/group/project you set using `save_account()`. * A premium hub/group/project in your account. @@ -150,7 +148,6 @@ def __init__( Returns: An instance of IBMProvider with services like :class:`~qiskit_ibm.IBMBackendService`, :class:`~qiskit_ibm.runtime.IBMRuntimeService`, - :class:`~qiskit_ibm.experiment.IBMExperimentService` and :class:`~qiskit_ibm.random.IBMRandomService` as available to the account. @@ -392,7 +389,6 @@ def _get_hgps( def _initialize_services(self) -> None: """Initialize all services.""" self._backend = None - self._experiment = None self._random = None self._runtime = None hgps = self._get_hgps() @@ -401,20 +397,16 @@ def _initialize_services(self) -> None: if not self._backend: self._backend = IBMBackendService(self, hgp) # Initialize other services. - if not self._experiment: - self._experiment = IBMExperimentService(self, hgp) \ - if hgp.has_service('experiment') else None if not self._random: self._random = IBMRandomService(self, hgp) \ if hgp.has_service('random') else None if not self._runtime: self._runtime = IBMRuntimeService(self, hgp) \ if hgp.has_service('runtime') else None - if all([self._backend, self._experiment, self._random, self._runtime]): + if all([self._backend, self._random, self._runtime]): break self._services = {'backend': self._backend, 'random': self._random, - 'experiment': self._experiment, 'runtime': self._runtime} @property @@ -426,22 +418,6 @@ def backend(self) -> IBMBackendService: """ return self._backend - @property - def experiment(self) -> IBMExperimentService: - """Return the experiment service. - - Returns: - The experiment service instance. - - Raises: - IBMNotAuthorizedError: If the account is not authorized to use - the experiment service. - """ - if self._experiment: - return self._experiment - else: - raise IBMNotAuthorizedError("You are not authorized to use the experiment service.") - @property def random(self) -> IBMRandomService: """Return the random number service. diff --git a/requirements-dev.txt b/requirements-dev.txt index 751e72dec..384543042 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -25,4 +25,3 @@ qiskit_rng qiskit-aer websockets>=8 scikit-quant;platform_system != 'Windows' -qiskit-experiments; diff --git a/test/ibm/experiment/__init__.py b/test/ibm/experiment/__init__.py deleted file mode 100644 index a65671f5f..000000000 --- a/test/ibm/experiment/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Experiment related tests.""" diff --git a/test/ibm/experiment/test_experiment.py b/test/ibm/experiment/test_experiment.py deleted file mode 100644 index 3f0be4ebb..000000000 --- a/test/ibm/experiment/test_experiment.py +++ /dev/null @@ -1,56 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Experiment tests.""" - -from unittest import SkipTest - -from qiskit_ibm.exceptions import IBMNotAuthorizedError -from qiskit_ibm.credentials import read_credentials_from_qiskitrc - -from ...ibm_test_case import IBMTestCase -from ...decorators import requires_provider -from ...contextmanagers import no_envs, custom_qiskitrc, CREDENTIAL_ENV_VARS - - -class TestExperimentPreferences(IBMTestCase): - """Test experiment preferences.""" - - @classmethod - @requires_provider - def setUpClass(cls, provider, hub, group, project): - """Initial class level setup.""" - # pylint: disable=arguments-differ - super().setUpClass() - cls.provider = provider - cls.hub = hub - cls.group = group - cls.project = project - try: - cls.service = cls.provider.service('experiment') - except IBMNotAuthorizedError: - raise SkipTest("Not authorized to use experiment service.") - - def test_default_preferences(self): - """Test getting default preferences.""" - self.assertFalse(self.service.preferences['auto_save']) - - def test_set_preferences(self): - """Test setting preferences.""" - with custom_qiskitrc(), no_envs(CREDENTIAL_ENV_VARS): - self.service.save_preferences(auto_save=True) - self.assertTrue(self.service.preferences['auto_save']) - - # Read back from qiskitrc. - _, stored_pref = read_credentials_from_qiskitrc() - self.assertTrue( - stored_pref[self.provider.credentials.unique_id()]['experiment']['auto_save']) diff --git a/test/ibm/experiment/test_experiment_server_integration.py b/test/ibm/experiment/test_experiment_server_integration.py deleted file mode 100644 index 2bea48f99..000000000 --- a/test/ibm/experiment/test_experiment_server_integration.py +++ /dev/null @@ -1,1099 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Experiment integration test with server.""" - -import os -import uuid -from unittest import mock, SkipTest, skipIf -from datetime import datetime, timedelta -from typing import Optional, Dict, Any -import re -from dateutil import tz -import numpy as np - -from qiskit_ibm.experiment.constants import ExperimentShareLevel -from qiskit_ibm.exceptions import IBMNotAuthorizedError -from qiskit_ibm.experiment import ResultQuality, IBMExperimentEntryNotFound - -from ...ibm_test_case import IBMTestCase -from ...decorators import requires_provider, requires_device -from .utils import ExperimentEncoder, ExperimentDecoder - - -@skipIf(not os.environ.get('QISKIT_IBM_USE_STAGING_CREDENTIALS', ''), "Only runs on staging") -class TestExperimentServerIntegration(IBMTestCase): - """Test experiment modules.""" - - @classmethod - @requires_provider - def setUpClass(cls, provider, hub, group, project): - """Initial class level setup.""" - # pylint: disable=arguments-differ - super().setUpClass() - cls.provider = provider - cls.hub = hub - cls.group = group - cls.project = project - cls.backend = cls._setup_backend() # pylint: disable=no-value-for-parameter - try: - cls.device_components = cls.provider.experiment.device_components(cls.backend.name()) - except Exception: - raise SkipTest("Not authorized to use experiment service.") - - @classmethod - @requires_device - def _setup_backend(cls, backend): - """Get a backend for the class.""" - return backend - - def setUp(self) -> None: - """Test level setup.""" - super().setUp() - self.experiments_to_delete = [] - - def tearDown(self): - """Test level tear down.""" - for expr_uuid in self.experiments_to_delete: - try: - with mock.patch('builtins.input', lambda _: 'y'): - self.provider.experiment.delete_experiment(expr_uuid) - except Exception as err: # pylint: disable=broad-except - self.log.info("Unable to delete experiment %s: %s", expr_uuid, err) - super().tearDown() - - def test_unauthorized(self): - """Test unauthorized access.""" - saved_experiment = self.provider._experiment - try: - self.provider._experiment = None - with self.assertRaises(IBMNotAuthorizedError) as context_manager: - self.provider.experiment.experiments() - self.assertIn("experiment service", str(context_manager.exception)) - finally: - self.provider._experiment = saved_experiment - - def test_experiments(self): - """Test retrieving experiments.""" - exp_id = self._create_experiment() - experiments = self.provider.experiment.experiments() - self.assertTrue(experiments, "No experiments found.") - - found = False - for exp in experiments: - self.assertTrue(exp["experiment_id"], "{} does not have an ID!".format(exp)) - for dt_attr in ['start_datetime', 'creation_datetime', - 'end_datetime', 'updated_datetime']: - if getattr(exp, dt_attr, None): - self.assertTrue(getattr(exp, dt_attr).tzinfo) - if exp["experiment_id"] == exp_id: - found = True - self.assertTrue(found, f"Experiment {exp_id} not found!") - - def test_experiments_with_backend(self): - """Test retrieving all experiments for a specific backend.""" - exp_id = self._create_experiment() - backend_experiments = self.provider.experiment.experiments( - backend_name=self.backend.name()) - - found = False - for exp in backend_experiments: - self.assertEqual(self.backend.name(), exp["backend"].name()) - if exp["experiment_id"] == exp_id: - found = True - self.assertTrue(found, "Experiment {} not found when filter by backend name {}.".format( - exp_id, self.backend.name())) - - def test_experiments_with_type(self): - """Test retrieving all experiments for a specific type.""" - exp_type = 'qiskit_test' - exp_id = self._create_experiment(experiment_type=exp_type) - backend_experiments = self.provider.experiment.experiments( - experiment_type=exp_type) - - found = False - for exp in backend_experiments: - self.assertEqual(exp_type, exp["experiment_type"]) - if exp["experiment_id"] == exp_id: - found = True - self.assertTrue(found, "Experiment {} not found when filter by type {}.".format( - exp_id, exp_type)) - - def test_experiments_with_parent_id(self): - """Test retrieving all experiments for a specific parent id.""" - parent_id = self._create_experiment() - child_id = self._create_experiment(parent_id=parent_id) - experiments = self.provider.experiment.experiments( - parent_id=parent_id) - - found = False - for exp in experiments: - self.assertEqual(parent_id, exp["parent_id"]) - if exp["experiment_id"] == child_id: - found = True - self.assertTrue(found, "Experiment {} not found when filter by type {}.".format( - child_id, parent_id)) - - def test_experiments_with_type_operator(self): - """Test retrieving all experiments for a specific type with operator.""" - exp_type = 'qiskit_test' - exp_id = self._create_experiment(experiment_type=exp_type) - - experiments = self.provider.experiment.experiments( - experiment_type="foo", experiment_type_operator="like") - self.assertNotIn(exp_id, [exp["experiment_id"] for exp in experiments]) - - subtests = ["qiskit", "test"] - for filter_type in subtests: - with self.subTest(filter_type=filter_type): - experiments = self.provider.experiment.experiments( - experiment_type=exp_type, experiment_type_operator="like") - found = False - for exp in experiments: - self.assertTrue(re.match(f".*{filter_type}.*", exp["experiment_type"])) - if exp["experiment_id"] == exp_id: - found = True - self.assertTrue(found, f"Experiment {exp_id} not found " - f"when filter by type {filter_type}") - - def test_experiments_with_bad_type_operator(self): - """Test retrieving all experiments with a bad type operator.""" - with self.assertRaises(ValueError): - self.provider.experiment.experiments( - experiment_type="foo", experiment_type_operator="bad") - - def test_experiments_with_start_time(self): - """Test retrieving an experiment by its start_time.""" - ref_start_dt = datetime.now() - timedelta(days=1) - ref_start_dt = ref_start_dt.replace(tzinfo=tz.tzlocal()) - exp_id = self._create_experiment(start_datetime=ref_start_dt) - - before_start = ref_start_dt - timedelta(hours=1) - after_start = ref_start_dt + timedelta(hours=1) - sub_tests = [(before_start, None, True, "before start, None"), - (None, after_start, True, "None, after start"), - (before_start, after_start, True, "before, after start"), - (after_start, None, False, "after start, None"), - (None, before_start, False, "None, before start"), - (before_start, before_start, False, "before, before start") - ] - - for start_dt, end_dt, expected, title in sub_tests: - with self.subTest(title=title): - backend_experiments = self.provider.experiment.experiments( - start_datetime_after=start_dt, start_datetime_before=end_dt, - experiment_type='qiskit_test') - found = False - for exp in backend_experiments: - if start_dt: - self.assertGreaterEqual(exp["start_datetime"], start_dt) - if end_dt: - self.assertLessEqual(exp["start_datetime"], end_dt) - if exp["experiment_id"] == exp_id: - found = True - self.assertEqual(found, expected, - "Experiment {} (not)found unexpectedly when filter using" - "start_dt={}, end_dt={}. Found={}".format( - exp_id, start_dt, end_dt, found)) - - def test_experiments_with_tags(self): - """Test filtering experiments using tags.""" - ref_tags = ["qiskit_test", "foo"] - exp_id = self._create_experiment(tags=ref_tags) - - phantom_tag = uuid.uuid4().hex - sub_tests = [ - (ref_tags, 'AND', True), - (ref_tags, 'OR', True), - (ref_tags[:1], "OR", True), - (ref_tags + [phantom_tag], "AND", False), - (ref_tags + [phantom_tag], "OR", True), - ([phantom_tag], "OR", False) - ] - for tags, operator, found in sub_tests: - with self.subTest(tags=tags, operator=operator): - experiments = self.provider.experiment.experiments( - tags=tags, tags_operator=operator) - ref_expr_found = False - for expr in experiments: - msg = "Tags {} not fond in experiment tags {}".format(tags, expr["tags"]) - if operator == 'AND': - self.assertTrue(all(f_tag in expr["tags"] for f_tag in tags), msg) - else: - self.assertTrue(any(f_tag in expr["tags"] for f_tag in tags), msg) - if expr["experiment_id"] == exp_id: - ref_expr_found = True - self.assertTrue(ref_expr_found == found, - "Experiment tags {} unexpectedly (not)found. Found={}".format( - ref_tags, found)) - - def test_experiments_with_hgp(self): - """Test retrieving all experiments for a specific h/g/p.""" - exp_id = self._create_experiment() - sub_tests = [ - {'hub': self.hub}, - {'hub': self.hub, 'group': self.group}, - {'hub': self.hub, 'group': self.group, 'project': self.project} - ] - - for hgp_kwargs in sub_tests: - with self.subTest(kwargs=hgp_kwargs.keys()): - hgp_experiments = self.provider.experiment.experiments(**hgp_kwargs) - ref_expr_found = False - for expr in hgp_experiments: - for hgp_key, hgp_val in hgp_kwargs.items(): - self.assertEqual(expr[hgp_key], hgp_val) - if expr["experiment_id"] == exp_id: - ref_expr_found = True - self.assertTrue(ref_expr_found) - - def test_experiments_with_hgp_error(self): - """Test retrieving experiments with bad h/g/p specification.""" - sub_tests = [ - ({'project': 'test_project'}, ['hub', 'group']), - ({'project': 'test_project', 'group': 'test_group'}, ['hub']), - ({'project': 'test_project', 'hub': 'test_hub'}, ['group']), - ({'group': 'test_group'}, ['hub']) - ] - - for hgp_kwargs, missing_keys in sub_tests: - with self.subTest(kwargs=hgp_kwargs.keys()): - with self.assertRaises(ValueError) as ex_cm: - self.provider.experiment.experiments(**hgp_kwargs) - for key in missing_keys: - self.assertIn(key, str(ex_cm.exception)) - - def test_experiments_with_exclude_public(self): - """Tests retrieving experiments with exclude_public filter.""" - # Make sure that we have at least one public experiment and one non-public - # experiment. - public_exp_id = self._create_experiment(share_level=ExperimentShareLevel.PUBLIC) - private_exp_id = self._create_experiment(share_level=ExperimentShareLevel.PRIVATE) - - experiments = self.provider.experiment.experiments(exclude_public=True) - # The public experiment we just created should not be in the set. - non_public_experiment_uuids = [] - for experiment in experiments: - self.assertNotEqual( - experiment["share_level"], ExperimentShareLevel.PUBLIC.value, - 'Public experiment should not be returned with exclude_public filter: %s' % - experiment) - non_public_experiment_uuids.append(experiment["experiment_id"]) - self.assertIn( - private_exp_id, non_public_experiment_uuids, - 'Non-public experiment not returned with exclude_public filter: %s' % - private_exp_id) - self.assertNotIn( - public_exp_id, non_public_experiment_uuids, - 'Public experiment returned with exclude_public filter: %s' % - public_exp_id) - - def test_experiments_with_public_only(self): - """Tests retrieving experiments with public_only filter.""" - # Make sure that we have at least one public experiment and one non-public - # experiment. - public_exp_id = self._create_experiment(share_level=ExperimentShareLevel.PUBLIC) - private_exp_id = self._create_experiment(share_level=ExperimentShareLevel.PRIVATE) - - experiments = self.provider.experiment.experiments(public_only=True) - public_experiment_uuids = [] - for experiment in experiments: - self.assertEqual( - experiment["share_level"], ExperimentShareLevel.PUBLIC.value, - 'Only public experiments should be returned with public_only filter: %s' % - experiment) - public_experiment_uuids.append(experiment["experiment_id"]) - self.assertIn( - public_exp_id, public_experiment_uuids, - 'Public experiment not returned with public_only filter: %s' % - public_exp_id) - self.assertNotIn( - private_exp_id, public_experiment_uuids, - 'Non-public experiment returned with public_only filter: %s' % - private_exp_id) - - def test_experiments_with_public_filters_error(self): - """Tests that exclude_public and public_only cannot both be True.""" - with self.assertRaisesRegex( - ValueError, - 'exclude_public and public_only cannot both be True'): - self.provider.experiment.experiments(exclude_public=True, public_only=True) - - def test_experiments_with_exclude_mine(self): - """Tests retrieving experiments with exclude_mine filter.""" - # Note that we cannot specify the owner when creating the experiment, the value comes - # from the user profile via the token so we would have to use different test accounts - # to explicitly create separately-owned experiments. We should be able to assume that - # there is at least one experiment owned by another user in the integration test - # environment though. - exp_id = self._create_experiment() - exp_owner = self.provider.experiment.experiment(exp_id)["owner"] - - not_my_experiments = self.provider.experiment.experiments(exclude_mine=True) - # The experiment we just created should not be in the set. - not_mine_experiment_uuids = [] - for experiment in not_my_experiments: - self.assertNotEqual( - experiment["owner"], exp_owner, # pylint: disable=no-member - 'My experiment should not be returned with exclude_mine filter: %s' % - experiment["experiment_id"]) - not_mine_experiment_uuids.append(experiment["experiment_id"]) - self.assertNotIn( - exp_id, not_mine_experiment_uuids, - 'My experiment returned with exclude_mine filter: %s' % - exp_id) - - def test_experiments_with_mine_only(self): - """Tests retrieving experiments with mine_only filter.""" - # Note that we cannot specify the owner when creating the experiment, the value comes - # from the user profile via the token so we would have to use different test accounts - # to explicitly create separately-owned epxeriments. We should be able to assume that - # there is at least one experiment owned by another user in the integration test - # environment though. - exp_id = self._create_experiment() - exp_owner = self.provider.experiment.experiment(exp_id)["owner"] - my_experiments = self.provider.experiment.experiments(mine_only=True) - my_experiment_uuids = [] - for experiment in my_experiments: - self.assertEqual( - experiment["owner"], exp_owner, # pylint: disable=no-member - 'Only my experiments should be returned with mine_only filter: %s' % - experiment["experiment_id"]) - my_experiment_uuids.append(experiment["experiment_id"]) - self.assertIn( - exp_id, my_experiment_uuids, - 'My experiment not returned with mine_only filter: %s' % - exp_id) - - def test_experiments_with_owner_filters_error(self): - """Tests that exclude_mine and mine_only cannot both be True.""" - with self.assertRaisesRegex( - ValueError, - 'exclude_mine and mine_only cannot both be True'): - self.provider.experiment.experiments(exclude_mine=True, mine_only=True) - - def test_experiments_with_limit(self): - """Test retrieving experiments with limit.""" - self._create_experiment() - experiments = self.provider.experiment.experiments(limit=1) - self.assertEqual(1, len(experiments)) - - def test_experiments_with_no_limit(self): - """Test retrieving experiments with no limit.""" - tags = [str(uuid.uuid4())] - exp_id = self._create_experiment(tags=tags) - experiments = self.provider.experiment.experiments(limit=None, tags=tags) - self.assertEqual(1, len(experiments)) - self.assertEqual(exp_id, experiments[0]["experiment_id"]) - - def test_experiments_with_sort_by(self): - """Test retrieving experiments with sort_by.""" - tags = [str(uuid.uuid4())] - exp1 = self._create_experiment(tags=tags, - experiment_type="qiskit_test1", - start_datetime=datetime.now()-timedelta(hours=1)) - exp2 = self._create_experiment(tags=tags, - experiment_type="qiskit_test2", - start_datetime=datetime.now()) - exp3 = self._create_experiment(tags=tags, - experiment_type="qiskit_test1", - start_datetime=datetime.now()-timedelta(hours=2)) - - subtests = [ - (["experiment_type:asc"], [exp1, exp3, exp2]), - (["experiment_type:desc"], [exp2, exp1, exp3]), - (["start_datetime:asc"], [exp3, exp1, exp2]), - (["start_datetime:desc"], [exp2, exp1, exp3]), - (["experiment_type:asc", "start_datetime:asc"], [exp3, exp1, exp2]), - (["experiment_type:asc", "start_datetime:desc"], [exp1, exp3, exp2]), - (["experiment_type:desc", "start_datetime:asc"], [exp2, exp3, exp1]), - (["experiment_type:desc", "start_datetime:desc"], [exp2, exp1, exp3]), - ] - - for sort_by, expected in subtests: - with self.subTest(sort_by=sort_by): - experiments = self.provider.experiment.experiments(tags=tags, sort_by=sort_by) - self.assertEqual(expected, [exp["experiment_id"] for exp in experiments]) - - def test_experiments_with_bad_sort_by(self): - """Test retrieving experiments with bad sort_by.""" - subtests = ["experiment_id:asc", "experiment_type", "experiment_type:foo", "foo:bar"] - - for sort_by in subtests: - with self.subTest(sort_by=sort_by): - with self.assertRaises(ValueError): - self.provider.experiment.experiments(sort_by=sort_by) - - def test_experiments_with_device_components(self): - """Test filtering experiments with device components.""" - expr_id = self._create_experiment() - self._create_analysis_result(exp_id=expr_id, - device_components=self.device_components) - experiments = self.provider.experiment.experiments( - device_components=self.device_components) - self.assertIn(expr_id, [expr["experiment_id"] for expr in experiments], - f"Experiment {expr_id} not found when filtering with " - f"device components {self.device_components}") - - def test_experiments_with_device_components_operator(self): - """Test filtering experiments with device components operator.""" - backend_name, device_components = self._find_backend_device_components(3) - if not backend_name: - self.skipTest("Need at least 3 device components.") - - expr_id = self._create_experiment(backend_name=backend_name) - self._create_analysis_result(exp_id=expr_id, - device_components=device_components) - experiments = self.provider.experiment.experiments( - device_components=device_components[:2], - device_components_operator="contains") - - self.assertIn(expr_id, [expr["experiment_id"] for expr in experiments], - f"Experiment {expr_id} not found when filtering with " - f"device components {device_components[:2]}") - - def test_experiments_with_bad_components_operator(self): - """Test filtering experiments with bad device components operator.""" - with self.assertRaises(ValueError): - self.provider.experiment.experiments( - device_components=["Q1"], - device_components_operator="foo") - - def test_retrieve_experiment(self): - """Test retrieving an experiment by its ID.""" - exp_id = self._create_experiment() - rexp = self.provider.experiment.experiment(exp_id) - self.assertEqual(exp_id, rexp["experiment_id"]) - for attr in ['hub', 'group', 'project', 'owner', 'share_level']: - self.assertIsNotNone(rexp[attr], "{} does not have a {}".format(rexp, attr)) - - def test_upload_experiment(self): - """Test uploading an experiment.""" - exp_id = str(uuid.uuid4()) - new_exp_id = self.provider.experiment.create_experiment( - experiment_type="qiskit_test", - backend_name=self.backend.name(), - metadata={"foo": "bar"}, - experiment_id=exp_id, - job_ids=["job1", "job2"], - tags=["qiskit_test"], - notes="some notes", - share_level=ExperimentShareLevel.PROJECT, - start_datetime=datetime.now() - ) - self.experiments_to_delete.append(new_exp_id) - self.assertEqual(exp_id, new_exp_id) - new_exp = self.provider.experiment.experiment(new_exp_id) - - self.assertEqual(self.hub, new_exp["hub"]) # pylint: disable=no-member - self.assertEqual(self.group, new_exp["group"]) # pylint: disable=no-member - self.assertEqual(self.project, new_exp["project"]) # pylint: disable=no-member - self.assertEqual("qiskit_test", new_exp["experiment_type"]) - self.assertEqual(self.backend.name(), new_exp["backend"].name()) - self.assertEqual({"foo": "bar"}, new_exp["metadata"]) - self.assertEqual(["job1", "job2"], new_exp["job_ids"]) - self.assertEqual(["qiskit_test"], new_exp["tags"]) - self.assertEqual("some notes", new_exp["notes"]) - self.assertEqual(ExperimentShareLevel.PROJECT.value, new_exp["share_level"]) - self.assertTrue(new_exp["creation_datetime"]) - self.assertIsNotNone(new_exp["owner"], 'Owner should be set') # pylint: disable=no-member - - for dt_attr in ['start_datetime', 'creation_datetime', 'end_datetime', 'updated_datetime']: - if dt_attr in new_exp: - self.assertTrue(new_exp[dt_attr].tzinfo) - - def test_update_experiment(self): - """Test updating an experiment.""" - new_exp_id = self._create_experiment() - - self.provider.experiment.update_experiment( - experiment_id=new_exp_id, - metadata={"foo": "bar"}, - job_ids=["job1", "job2"], - tags=["qiskit_test"], - notes="some notes", - share_level=ExperimentShareLevel.PROJECT, - end_datetime=datetime.now() - ) - - rexp = self.provider.experiment.experiment(new_exp_id) - self.assertEqual({"foo": "bar"}, rexp["metadata"]) - self.assertEqual(["job1", "job2"], rexp["job_ids"]) - self.assertEqual(["qiskit_test"], rexp["tags"]) - self.assertEqual("some notes", rexp["notes"]) - self.assertEqual(ExperimentShareLevel.PROJECT.value, rexp["share_level"]) - self.assertTrue(rexp["end_datetime"]) - - def test_delete_experiment(self): - """Test deleting an experiment.""" - new_exp_id = self._create_experiment(notes='delete me') - - with mock.patch('builtins.input', lambda _: 'y'): - self.provider.experiment.delete_experiment(new_exp_id) - - with self.assertRaises(IBMExperimentEntryNotFound) as ex_cm: - self.provider.experiment.experiment(new_exp_id) - self.assertIn("Not Found for url", ex_cm.exception.message) - - def test_upload_analysis_result(self): - """Test uploading an analysis result.""" - exp_id = self._create_experiment() - fit = dict(value=41.456, variance=4.051) - result_id = str(uuid.uuid4()) - chisq = 1.3253 - aresult_id = self.provider.experiment.create_analysis_result( - experiment_id=exp_id, - result_type="qiskit_test", - result_data=fit, - device_components=self.device_components, - tags=["qiskit_test"], - quality=ResultQuality.GOOD, - verified=True, - result_id=result_id, - chisq=chisq - ) - - rresult = self.provider.experiment.analysis_result(aresult_id) - self.assertEqual(exp_id, rresult["experiment_id"]) - self.assertEqual("qiskit_test", rresult["result_type"]) - self.assertEqual(fit, rresult["result_data"]) - self.assertEqual(self.device_components, - [str(comp) for comp in rresult["device_components"]]) - self.assertEqual(["qiskit_test"], rresult["tags"]) - self.assertEqual(ResultQuality.GOOD, rresult["quality"]) - self.assertTrue(rresult["verified"]) - self.assertEqual(result_id, rresult["result_id"]) - self.assertEqual(chisq, rresult["chisq"]) - - def test_update_analysis_result(self): - """Test updating an analysis result.""" - result_id = self._create_analysis_result() - fit = dict(value=41.456, variance=4.051) - chisq = 1.3253 - - self.provider.experiment.update_analysis_result( - result_id=result_id, - result_data=fit, - tags=["qiskit_test"], - quality=ResultQuality.GOOD, - verified=True, - chisq=chisq - ) - - rresult = self.provider.experiment.analysis_result(result_id) - self.assertEqual(result_id, rresult["result_id"]) - self.assertEqual(fit, rresult["result_data"]) - self.assertEqual(["qiskit_test"], rresult["tags"]) - self.assertEqual(ResultQuality.GOOD, rresult["quality"]) - self.assertTrue(rresult["verified"]) - self.assertEqual(chisq, rresult["chisq"]) - - def test_analysis_results(self): - """Test retrieving all analysis results.""" - result_id = self._create_analysis_result() - results = self.provider.experiment.analysis_results() - found = False - for res in results: - self.assertIsInstance(res["verified"], bool) - self.assertIsInstance(res["result_data"], dict) - self.assertTrue(res["result_id"], "{} does not have an uuid!".format(res)) - for dt_attr in ['creation_datetime', 'updated_datetime']: - if dt_attr in res: - self.assertTrue(res[dt_attr].tzinfo) - if res["result_id"] == result_id: - found = True - self.assertTrue(found) - - def test_analysis_results_device_components(self): - """Test filtering analysis results with device components.""" - result_id = self._create_analysis_result(device_components=self.device_components) - results = self.provider.experiment.analysis_results( - device_components=self.device_components) - - found = False - for res in results: - self.assertEqual(self.device_components, - [str(comp) for comp in res["device_components"]]) - if res["result_id"] == result_id: - found = True - self.assertTrue(found, f"Result {result_id} not found when filtering by " - f"device components {self.device_components}") - - def test_analysis_results_device_components_operator(self): - """Test filtering analysis results with device components operator.""" - backend_name, device_components = self._find_backend_device_components(3) - if not backend_name: - self.skipTest("Need at least 3 device components.") - - expr_id = self._create_experiment(backend_name=backend_name) - result_id = self._create_analysis_result(exp_id=expr_id, - device_components=device_components) - results = self.provider.experiment.analysis_results( - device_components=device_components[:2], device_components_operator="contains") - - found = False - for res in results: - self.assertTrue(set(device_components[:2]) <= - {str(comp) for comp in res["device_components"]}) - if res["result_id"] == result_id: - found = True - self.assertTrue(found, f"Result {result_id} not found when filtering by " - f"device components {device_components[:2]}") - - def test_analysis_results_experiment_id(self): - """Test filtering analysis results with experiment id.""" - expr_id = self._create_experiment() - result_id1 = self._create_analysis_result(exp_id=expr_id) - result_id2 = self._create_analysis_result(exp_id=expr_id) - - results = self.provider.experiment.analysis_results(experiment_id=expr_id) - self.assertEqual(2, len(results)) - self.assertEqual({result_id1, result_id2}, {res["result_id"] for res in results}) - - def test_analysis_results_type(self): - """Test filtering analysis results with type.""" - result_type = "qiskit_test" - result_id = self._create_analysis_result(result_type=result_type) - results = self.provider.experiment.analysis_results(result_type=result_type) - found = False - for res in results: - self.assertEqual(result_type, res["result_type"]) - if res["result_id"] == result_id: - found = True - self.assertTrue(found, f"Result {result_id} not returned when filtering by " - f"type {result_type}") - - def test_analysis_results_type_operator(self): - """Test filtering analysis results with type operator.""" - result_type = "qiskit_test_1234" - result_id = self._create_analysis_result(result_type=result_type) - - results = self.provider.experiment.analysis_results( - result_type="foo", result_type_operator="like") - self.assertNotIn(result_id, [res["result_id"] for res in results]) - - subtests = ["qiskit_test", "test_1234"] - for filter_type in subtests: - with self.subTest(filter_type=filter_type): - results = self.provider.experiment.analysis_results( - result_type=filter_type, - result_type_operator="like") - - found = False - for res in results: - self.assertIn(filter_type, res["result_type"]) - if res["result_id"] == result_id: - found = True - self.assertTrue(found, f"Result {result_id} not returned when filtering by " - f"type substring {filter_type}") - - def test_analysis_results_bad_type_operator(self): - """Test retrieving all experiments with a bad type operator.""" - with self.assertRaises(ValueError): - self.provider.experiment.analysis_results( - result_type="foo", result_type_operator="bad") - - def test_analysis_results_quality(self): - """Test filtering analysis results with quality.""" - expr_id = self._create_experiment() - result_id1 = self._create_analysis_result(exp_id=expr_id, quality=ResultQuality.GOOD) - result_id2 = self._create_analysis_result(exp_id=expr_id, quality=ResultQuality.BAD) - result_id3 = self._create_analysis_result(exp_id=expr_id, quality=ResultQuality.UNKNOWN) - - subtests = [(ResultQuality.GOOD, {result_id1}), - (ResultQuality.BAD.value, {result_id2}), - ("unknown", {result_id3}), - ([ResultQuality.UNKNOWN], {result_id3}), - ([ResultQuality.GOOD, ResultQuality.UNKNOWN], {result_id1, result_id3}), - (["Good", "Bad"], {result_id1, result_id2}), - ([ResultQuality.UNKNOWN, ResultQuality.BAD], {result_id3, result_id2}), - ([ResultQuality.GOOD, ResultQuality.BAD, ResultQuality.UNKNOWN], - {result_id1, result_id2, result_id3}) - ] - - for quality, expected in subtests: - with self.subTest(quality=quality): - results = self.provider.experiment.analysis_results(quality=quality) - if not isinstance(quality, list): - quality = [quality] - qual_set = [] - for qual in quality: - if isinstance(qual, str): - qual = ResultQuality(qual.upper()) - qual_set.append(qual) - res_ids = set() - for res in results: - self.assertIn(res["quality"], qual_set) - res_ids.add(res["result_id"]) - self.assertTrue(expected <= res_ids, - f"Result {expected} not returned " - f"when filter with quality {quality}") - - def test_analysis_results_backend_name(self): - """Test filtering analysis results with backend name.""" - result_id = self._create_analysis_result() - results = self.provider.experiment.analysis_results(backend_name=self.backend.name()) - self.assertIn(result_id, [res["result_id"] for res in results]) - - def test_analysis_results_verified(self): - """Test filtering analysis results with verified.""" - result_id = self._create_analysis_result(verified=True) - results = self.provider.experiment.analysis_results(verified=True) - found = False - for res in results: - self.assertTrue(res["verified"]) - if res["result_id"] == result_id: - found = True - self.assertTrue(found, f"Result {result_id} not found when " - f"filtering with verified=True") - - def test_analysis_results_with_tags(self): - """Test filtering analysis results using tags.""" - ref_tags = ["qiskit_test", "foo"] - result_id = self._create_analysis_result(tags=ref_tags) - - phantom_tag = uuid.uuid4().hex - sub_tests = [ - (ref_tags, 'AND', True), - (ref_tags, 'OR', True), - (ref_tags[:1], "OR", True), - (ref_tags + [phantom_tag], "AND", False), - (ref_tags + [phantom_tag], "OR", True), - ([phantom_tag], "OR", False) - ] - for tags, operator, found in sub_tests: - with self.subTest(tags=tags, operator=operator): - results = self.provider.experiment.analysis_results( - tags=tags, tags_operator=operator) - res_found = False - for res in results: - msg = "Tags {} not fond in result tags {}".format(tags, res["tags"]) - if operator == 'AND': - self.assertTrue(all(f_tag in res["tags"] for f_tag in tags), msg) - else: - self.assertTrue(any(f_tag in res["tags"] for f_tag in tags), msg) - if res["result_id"] == result_id: - res_found = True - self.assertTrue(res_found == found, - "Result tags {} unexpectedly (not)found. Found={}".format( - ref_tags, found)) - - def test_analysis_results_with_limit(self): - """Test retrieving analysis results with limit.""" - self._create_analysis_result() - results = self.provider.experiment.analysis_results(limit=1) - self.assertEqual(1, len(results)) - - def test_analysis_results_with_no_limit(self): - """Test retrieving analysis results with no limit.""" - tags = [str(uuid.uuid4())] - result_id = self._create_analysis_result(tags=tags) - results = self.provider.experiment.analysis_results(limit=None, tags=tags) - self.assertEqual(1, len(results)) - self.assertEqual(result_id, results[0]["result_id"]) - - def test_analysis_results_with_sort_by(self): - """Test retrieving analysis results with sort_by.""" - tags = [str(uuid.uuid4())] - backend, components = self._find_backend_device_components(3) - backend_name = backend or self.backend.name() - device_components = components or self.device_components - if len(device_components) < 3: - device_components = [None]*3 # Skip testing device components. - device_components.sort() - expr_id = self._create_experiment(backend_name=backend_name) - - res1 = self._create_analysis_result(exp_id=expr_id, tags=tags, - result_type="qiskit_test1", - device_components=device_components[2]) - res2 = self._create_analysis_result(exp_id=expr_id, tags=tags, - result_type="qiskit_test2", - device_components=device_components[0]) - res3 = self._create_analysis_result(exp_id=expr_id, tags=tags, - result_type="qiskit_test1", - device_components=device_components[1]) - - subtests = [ - (["result_type:asc"], [res3, res1, res2]), - (["result_type:desc"], [res2, res3, res1]), - (["creation_datetime:asc"], [res1, res2, res3]), - (["creation_datetime:desc"], [res3, res2, res1]), - (["result_type:asc", "creation_datetime:asc"], [res1, res3, res2]), - (["result_type:asc", "creation_datetime:desc"], [res3, res1, res2]), - (["result_type:desc", "creation_datetime:asc"], [res2, res1, res3]), - (["result_type:desc", "creation_datetime:desc"], [res2, res3, res1]), - ] - if device_components[0]: - subtests += [ - (["device_components:asc"], [res2, res3, res1]), - (["device_components:desc"], [res1, res3, res2]), - (["result_type:asc", "device_components:desc"], [res1, res3, res2]) - ] - - for sort_by, expected in subtests: - with self.subTest(sort_by=sort_by): - results = self.provider.experiment.analysis_results(tags=tags, sort_by=sort_by) - self.assertEqual(expected, [res["result_id"] for res in results]) - - def test_analysis_results_with_bad_sort_by(self): - """Test retrieving analysis results with bad sort_by.""" - subtests = ["result_id:asc", "result_type", "result_type:foo", "foo:bar"] - - for sort_by in subtests: - with self.subTest(sort_by=sort_by): - with self.assertRaises(ValueError): - self.provider.experiment.analysis_results(sort_by=sort_by) - - def test_analysis_results_with_creation_datetime(self): - """Test retrieving analysis_results with creation_datetime""" - # Create an analysis_result and get it back to get its creation_datetime value. - result1_id = self._create_analysis_result() - result1 = self.provider.experiment.analysis_result(result1_id) - self.assertIn('creation_datetime', result1) - self.assertIsNotNone(result1['creation_datetime']) - cdt1 = result1['creation_datetime'] - # Assert that the UTC timestamp was converted to the local time. - self.assertIsNotNone(cdt1.tzinfo) - self.log.debug('Created first analysis result %s with creation_datetime %s', - result1_id, cdt1.isoformat()) - # Get the analysis result back using the exact creation timestamp - # using both ge and le prefixes. - results = self.provider.experiment.analysis_results( - creation_datetime_after=cdt1, - creation_datetime_before=cdt1 - ) - # Chances are that we should only get exactly one analysis result - # back but to be safe check for at least 1. - self.assertGreaterEqual(len(results), 1, results) - result_ids = [r['result_id'] for r in results] - self.assertIn(result1_id, result_ids) - # Create another analysis result on the same experiment. - result2_id = self._create_analysis_result(exp_id=result1['experiment_id']) - result2 = self.provider.experiment.analysis_result(result2_id) - cdt2 = result2['creation_datetime'] - self.log.debug('Created second analysis result %s with creation_datetime %s', - result2_id, cdt2.isoformat()) - # Get both results using their creation timestamps as a range. - results = self.provider.experiment.analysis_results( - creation_datetime_after=cdt1, - creation_datetime_before=cdt2 - ) - self.assertGreaterEqual(len(results), 2, results) - result_ids = [r['result_id'] for r in results] - for result_id in [result1_id, result2_id]: - self.assertIn(result_id, result_ids) - - def test_delete_analysis_result(self): - """Test deleting an analysis result.""" - result_id = self._create_analysis_result() - with mock.patch('builtins.input', lambda _: 'y'): - self.provider.experiment.delete_analysis_result(result_id) - - with self.assertRaises(IBMExperimentEntryNotFound): - self.provider.experiment.analysis_result(result_id) - - def test_backend_components(self): - """Test retrieving all device components.""" - device_components = self.provider.experiment.device_components() - self.assertTrue(device_components) - - def test_backend_components_backend_name(self): - """Test retrieving device components for a specific backend.""" - device_components = self.provider.experiment.device_components() - backend = list(device_components.keys())[0] - backend_components = self.provider.experiment.device_components(backend) - self.assertEqual(device_components[backend], backend_components) - - def test_retrieve_backends(self): - """Test retrieving all backends.""" - backends = self.provider.experiment.backends() - self.assertIn(self.backend.name(), [b['name'] for b in backends]) - - def test_create_figure(self): - """Test creating a figure.""" - hello_bytes = str.encode("hello world") - file_name = "hello_world.svg" - figure_name = "hello.svg" - with open(file_name, 'wb') as file: - file.write(hello_bytes) - self.addCleanup(os.remove, file_name) - - subtests = [ - (hello_bytes, None), - (hello_bytes, figure_name), - (file_name, None), - (file_name, file_name) - ] - - for figure, figure_name in subtests: - title = f"figure_name={figure_name}" if figure_name else f"figure={figure}" - with self.subTest(title=title): - expr_id = self._create_experiment() - name, _ = self.provider.experiment.create_figure( - experiment_id=expr_id, - figure=figure, - figure_name=figure_name - ) - if figure_name: - self.assertEqual(figure_name, name) - elif isinstance(figure, str): - self.assertEqual(figure, name) - expr = self.provider.experiment.experiment(expr_id) - self.assertIn(name, expr["figure_names"]) - - def test_figure(self): - """Test getting a figure.""" - hello_bytes = str.encode("hello world") - figure_name = "hello.svg" - expr_id = self._create_experiment() - self.provider.experiment.create_figure( - experiment_id=expr_id, - figure=hello_bytes, - figure_name=figure_name - ) - file_name = "hello_world.svg" - self.addCleanup(os.remove, file_name) - - subtests = [ - (figure_name, None), - (figure_name, file_name) - ] - - for figure_name, file_name in subtests: - with self.subTest(file_name=file_name): - fig = self.provider.experiment.figure(expr_id, figure_name, file_name) - if file_name: - with open(file_name, 'rb') as file: - self.assertEqual(hello_bytes, file.read()) - else: - self.assertEqual(hello_bytes, fig) - - def test_update_figure(self): - """Test uploading and updating plot data.""" - figure_name = "hello.svg" - expr_id = self._create_experiment() - self.provider.experiment.create_figure( - experiment_id=expr_id, - figure=str.encode("hello world"), - figure_name=figure_name - ) - friend_bytes = str.encode("hello friend!") - name, _ = self.provider.experiment.update_figure( - experiment_id=expr_id, - figure=friend_bytes, - figure_name=figure_name - ) - self.assertEqual(name, figure_name) - rplot = self.provider.experiment.figure(expr_id, figure_name) - self.assertEqual(rplot, friend_bytes, "Retrieved plot not equal updated plot.") - - def test_delete_figure(self): - """Test deleting a figure.""" - figure_name = "hello.svg" - expr_id = self._create_experiment() - self.provider.experiment.create_figure( - experiment_id=expr_id, - figure=str.encode("hello world"), - figure_name=figure_name - ) - with mock.patch('builtins.input', lambda _: 'y'): - self.provider.experiment.delete_figure(expr_id, figure_name) - self.assertRaises(IBMExperimentEntryNotFound, - self.provider.experiment.figure, expr_id, figure_name) - - def test_experiment_coders(self): - """Test custom encoder and decoder for an experiment.""" - metadata = {"complex": 2 + 3j, "numpy": np.zeros(2)} - expr_id = self._create_experiment(metadata=metadata, json_encoder=ExperimentEncoder) - rexp = self.provider.experiment.experiment(expr_id, json_decoder=ExperimentDecoder) - rmetadata = rexp["metadata"] - self.assertEqual(metadata["complex"], rmetadata["complex"]) - self.assertTrue((metadata["numpy"] == rmetadata["numpy"]).all()) - - new_metadata = {"complex": 4 + 5j, "numpy": np.ones(3)} - self.provider.experiment.update_experiment( - expr_id, metadata=new_metadata, json_encoder=ExperimentEncoder) - rexp = self.provider.experiment.experiment(expr_id, json_decoder=ExperimentDecoder) - rmetadata = rexp["metadata"] - self.assertEqual(new_metadata["complex"], rmetadata["complex"]) - self.assertTrue((new_metadata["numpy"] == rmetadata["numpy"]).all()) - - def test_analysis_result_coders(self): - """Test custom encoder and decoder for an analysis result.""" - data = {"complex": 2 + 3j, "numpy": np.zeros(2)} - result_id = self._create_analysis_result( - result_data=data, json_encoder=ExperimentEncoder) - rresult = self.provider.experiment.analysis_result( - result_id, json_decoder=ExperimentDecoder) - rdata = rresult["result_data"] - self.assertEqual(data["complex"], rdata["complex"]) - self.assertTrue((data["numpy"] == rdata["numpy"]).all()) - - new_data = {"complex": 4 + 5j, "numpy": np.ones(3)} - self.provider.experiment.update_analysis_result( - result_id, result_data=new_data, json_encoder=ExperimentEncoder) - rresult = self.provider.experiment.analysis_result( - result_id, json_decoder=ExperimentDecoder) - rdata = rresult["result_data"] - self.assertEqual(new_data["complex"], rdata["complex"]) - self.assertTrue((new_data["numpy"] == rdata["numpy"]).all()) - - def _create_experiment( - self, - experiment_type: Optional[str] = None, - backend_name: Optional[str] = None, - **kwargs - ) -> str: - """Create a new experiment.""" - experiment_type = experiment_type or 'qiskit_test' - backend_name = backend_name or self.backend.name() - exp_id = self.provider.experiment.create_experiment( - experiment_type=experiment_type, - backend_name=backend_name, - **kwargs - ) - self.experiments_to_delete.append(exp_id) - return exp_id - - def _create_analysis_result( - self, - exp_id: Optional[str] = None, - result_type: Optional[str] = None, - result_data: Optional[Dict] = None, - **kwargs: Any): - """Create a simple analysis result.""" - experiment_id = exp_id or self._create_experiment() - result_type = result_type or "qiskit_test" - result_data = result_data or {} - aresult_id = self.provider.experiment.create_analysis_result( - experiment_id=experiment_id, - result_data=result_data, - result_type=result_type, - **kwargs - ) - return aresult_id - - def _find_backend_device_components(self, min_components): - """Find a backend with the minimum number of device components.""" - backend_name = self.backend.name() - device_components = self.device_components - if len(device_components) < min_components: - all_components = self.provider.experiment.device_components() - for key, val in all_components.items(): - if len(val) >= min_components: - backend_name = key - device_components = val - break - if len(device_components) < min_components: - return None, None - - return backend_name, device_components diff --git a/test/ibm/experiment/utils.py b/test/ibm/experiment/utils.py deleted file mode 100644 index e2800b18a..000000000 --- a/test/ibm/experiment/utils.py +++ /dev/null @@ -1,50 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -# pylint: disable=method-hidden -# pylint: disable=arguments-differ - -"""Utility functions for experiment testing.""" - -from typing import Any -import json - -import numpy as np - - -class ExperimentEncoder(json.JSONEncoder): - """A test json encoder for experiments""" - - def default(self, obj: Any) -> Any: - if isinstance(obj, complex): - return {'__type__': 'complex', '__value__': [obj.real, obj.imag]} - if hasattr(obj, 'tolist'): - return {'__type__': 'array', '__value__': obj.tolist()} - - return json.JSONEncoder.default(self, obj) - - -class ExperimentDecoder(json.JSONDecoder): - """JSON Decoder for Numpy arrays and complex numbers.""" - - def __init__(self, *args, **kwargs): - super().__init__(object_hook=self.object_hook, *args, **kwargs) - - def object_hook(self, obj): - """Object hook.""" - if "__type__" in obj: - if obj["__type__"] == "complex": - val = obj["__value__"] - return val[0] + 1j * val[1] - if obj["__type__"] == "array": - return np.array(obj["__value__"]) - return obj diff --git a/test/ibm/test_ibm_provider.py b/test/ibm/test_ibm_provider.py index 30d6bca9d..eebd32717 100644 --- a/test/ibm/test_ibm_provider.py +++ b/test/ibm/test_ibm_provider.py @@ -24,7 +24,6 @@ from qiskit_ibm.ibm_backend import IBMSimulator, IBMBackend from qiskit_ibm.ibm_backend_service import IBMBackendService -from qiskit_ibm.experiment import IBMExperimentService from qiskit_ibm.random import IBMRandomService from qiskit_ibm.runtime import IBMRuntimeService from qiskit_ibm.ibm_provider import IBMProvider @@ -469,9 +468,6 @@ def test_provider_services(self): self.assertIsInstance(self.provider.service('backend'), IBMBackendService) self.assertIsInstance(self.provider.backend, IBMBackendService) - if 'experiment' in services: - self.assertIsInstance(self.provider.service('experiment'), IBMExperimentService) - self.assertIsInstance(self.provider.experiment, IBMExperimentService) if 'random' in services: self.assertIsInstance(self.provider.service('random'), IBMRandomService) self.assertIsInstance(self.provider.random, IBMRandomService)