diff --git a/qiskit_experiments/calibration_management/base_calibration_experiment.py b/qiskit_experiments/calibration_management/base_calibration_experiment.py index e0d427a65e..a53d9a6ffc 100644 --- a/qiskit_experiments/calibration_management/base_calibration_experiment.py +++ b/qiskit_experiments/calibration_management/base_calibration_experiment.py @@ -221,6 +221,7 @@ def run( self, backend: Optional[Backend] = None, analysis: Optional[Union[BaseAnalysis, None]] = "default", + timeout: Optional[float] = None, **run_options, ) -> ExperimentData: """Run an experiment, perform analysis, and update any calibrations. @@ -233,12 +234,16 @@ def run( analysis. If None analysis will not be run. If ``"default"`` the experiments :meth:`analysis` instance will be used if it contains one. + timeout: Time to wait for experiment jobs to finish running before + cancelling. run_options: backend runtime options used for circuit execution. Returns: The experiment data object. """ - experiment_data = super().run(backend, analysis, **run_options) + experiment_data = super().run( + backend=backend, analysis=analysis, timeout=timeout, **run_options + ) self._add_cal_metadata(experiment_data) diff --git a/qiskit_experiments/database_service/db_experiment_data.py b/qiskit_experiments/database_service/db_experiment_data.py index 12a6d94dd6..9c316859c4 100644 --- a/qiskit_experiments/database_service/db_experiment_data.py +++ b/qiskit_experiments/database_service/db_experiment_data.py @@ -12,11 +12,12 @@ """Stored data class.""" +import warnings import logging import dataclasses -import threading import uuid -from typing import Optional, List, Any, Union, Callable, Dict +import enum +from typing import Optional, List, Any, Union, Callable, Dict, Tuple import copy from concurrent import futures from functools import wraps @@ -39,7 +40,6 @@ save_data, qiskit_version, plot_to_svg_bytes, - combined_timeout, ThreadSafeOrderedDict, ThreadSafeList, ) @@ -69,14 +69,37 @@ def service_exception_to_warning(): LOG.warning("Experiment service operation failed: %s", traceback.format_exc()) +class ExperimentStatus(enum.Enum): + """Class for experiment status enumerated type.""" + + EMPTY = "experiment data is empty" + INITIALIZING = "experiment jobs are being initialized" + VALIDATING = "experiment jobs are validating" + QUEUED = "experiment jobs are queued" + RUNNING = "experiment jobs is actively running" + CANCELLED = "experiment jobs or analysis has been cancelled" + POST_PROCESSING = "experiment analysis is actively running" + DONE = "experiment jobs and analysis have successfully run" + ERROR = "experiment jobs or analysis incurred an error" + + +class AnalysisStatus(enum.Enum): + """Class for analysis callback status enumerated type.""" + + QUEUED = "analysis callback is queued" + RUNNING = "analysis callback is actively running" + CANCELLED = "analysis callback has been cancelled" + DONE = "analysis callback has successfully run" + ERROR = "analysis callback incurred an error" + + @dataclasses.dataclass -class CallbackStatus: +class AnalysisCallback: """Dataclass for analysis callback status""" - callback: Callable - kwargs: Dict = dataclasses.field(default_factory=dict) - status: JobStatus = JobStatus.QUEUED - event: threading.Event = dataclasses.field(default_factory=threading.Event) + name: str = "" + callback_id: str = "" + status: AnalysisStatus = AnalysisStatus.QUEUED error_msg: Optional[str] = None @@ -104,8 +127,7 @@ class DbExperimentDataV1(DbExperimentData): version = 1 verbose = True # Whether to print messages to the standard output. _metadata_version = 1 - _executor = futures.ThreadPoolExecutor() - """Threads used for asynchronous processing.""" + _job_executor = futures.ThreadPoolExecutor() _json_encoder = ExperimentEncoder _json_decoder = ExperimentDecoder @@ -165,9 +187,10 @@ def __init__( self._notes = notes or "" self._jobs = ThreadSafeOrderedDict(job_ids or []) - self._job_futures = ThreadSafeList() - self._callback_statuses = ThreadSafeOrderedDict() - self._callback_future = None + self._job_futures = ThreadSafeOrderedDict() + self._analysis_callbacks = ThreadSafeOrderedDict() + self._analysis_futures = ThreadSafeOrderedDict() + self._analysis_executor = futures.ThreadPoolExecutor(max_workers=1) self._data = ThreadSafeList() self._figures = ThreadSafeOrderedDict(figure_names or []) @@ -211,39 +234,89 @@ def add_data( data: Experiment data to add. Several types are accepted for convenience * Result: Add data from this ``Result`` object. * List[Result]: Add data from the ``Result`` objects. - * Job: Add data from the job result. - * List[Job]: Add data from the job results. * Dict: Add this data. * List[Dict]: Add this list of data. - timeout: Timeout waiting for job to finish, if `data` is a ``Job``. + * Job: (Deprecated) Add data from the job result. + * List[Job]: (Deprecated) Add data from the job results. + timeout: (Deprecated) Timeout waiting for job to finish, if `data` is a ``Job``. Raises: TypeError: If the input data type is invalid. """ - if any(not status.event.is_set() for status in self._callback_statuses.values()): + if any(not future.done() for future in self._analysis_futures.values()): LOG.warning( - "Not all post-processing has finished. Adding new data " - "may create unexpected analysis results." + "Not all analysis has finished running. Adding new data may " + "create unexpected analysis results." ) - if not isinstance(data, list): data = [data] - # Extract job data and directly add non-job data - job_data = [] + # Extract job data (Deprecated) and directly add non-job data + jobs = [] with self._data.lock: for datum in data: if isinstance(datum, (Job, BaseJob)): - job_data.append(datum) + jobs.append(datum) elif isinstance(datum, dict): - self._add_single_data(datum) + self._data.append(datum) elif isinstance(datum, Result): self._add_result_data(datum) else: raise TypeError(f"Invalid data type {type(datum)}.") - # Add futures for job data - for job in job_data: + # Remove after deprecation is finished + if jobs: + warnings.warn( + "Passing Jobs to the `add_data` method is deprecated as of " + "qiskit-experiments 0.3.0 and will be removed in the 0.4.0 release. " + "Use the `add_jobs` method to add jobs instead.", + DeprecationWarning, + stacklevel=2, + ) + if timeout is not None: + warnings.warn( + "The `timeout` kwarg of is deprecated as of " + "qiskit-experiments 0.3.0 and will be removed in the 0.4.0 release. " + "Use the `add_jobs` method to add jobs with timeout.", + DeprecationWarning, + stacklevel=2, + ) + self.add_jobs(jobs, timeout=timeout) + + def add_jobs( + self, + jobs: Union[Job, List[Job]], + timeout: Optional[float] = None, + ) -> None: + """Add experiment data. + + Args: + jobs: The Job or list of Jobs to add result data from. + timeout: Optional, time in seconds to wait for all jobs to finish + before cancelling them. + + Raises: + TypeError: If the input data type is invalid. + + .. note:: + If a timeout is specified the :meth:`cancel_jobs` method will be + called after timing out to attempt to cancel any unfinished jobs. + + If you want to wait for jobs without cancelling, use the timeout + kwarg of :meth:`block_for_results` instead. + """ + if any(not future.done() for future in self._analysis_futures.values()): + LOG.warning( + "Not all analysis has finished running. Adding new jobs may " + "create unexpected analysis results." + ) + if isinstance(jobs, (Job, BaseJob)): + jobs = [jobs] + + # Add futures for extracting finished job data + timeout_ids = [] + for job in jobs: + jid = job.job_id() if self.backend and self.backend.name() != job.backend().name(): LOG.warning( "Adding a job from a backend (%s) that is different " @@ -257,20 +330,90 @@ def add_data( if not self._service: self._set_service_from_backend(self._backend) - self._jobs[job.job_id()] = job + if jid in self._jobs: + LOG.warning( + "Skipping duplicate job, a job with this ID already exists [Job ID: %s]", jid + ) + else: + self._jobs[jid] = job + if jid in self._job_futures: + LOG.warning("Job future has already been submitted [Job ID: %s]", jid) + else: + self._add_job_future(job) + if timeout is not None: + timeout_ids.append(jid) - if job_data: - job_kwargs = { - "jobs": job_data, - "timeout": timeout, - } - self._job_futures.append( - (job_kwargs, self._executor.submit(self._add_jobs_data, **job_kwargs)) - ) + # Add future for cancelling jobs that timeout + if timeout_ids: + self._job_executor.submit(self._timeout_running_jobs, timeout_ids, timeout) if self.auto_save: self.save_metadata() + def _timeout_running_jobs(self, job_ids, timeout): + """Function for cancelling jobs after timeout length. + + This function should be submitted to an executor to run as a future. + + Args: + job_ids: the IDs of jobs to wait for. + timeout: The total time to wait for all jobs before cancelling. + """ + futs = [self._job_futures[jid] for jid in job_ids] + waited = futures.wait(futs, timeout=timeout) + + # Try to cancel timed-out jobs + if waited.not_done: + LOG.debug("Cancelling running jobs that exceeded add_jobs timeout.") + done_ids = {fut.result()[0] for fut in waited.done} + notdone_ids = [jid for jid in job_ids if jid not in done_ids] + self.cancel_jobs(notdone_ids) + + def _add_job_future(self, job): + """Submit new _add_job_data job to executor""" + jid = job.job_id() + if jid in self._job_futures: + LOG.warning("Job future has already been submitted [Job ID: %s]", jid) + else: + self._job_futures[jid] = self._job_executor.submit(self._add_job_data, job) + + def _add_job_data( + self, + job: Union[Job, BaseJob], + ) -> Tuple[str, bool]: + """Wait for a job to finish and add job result data. + + Args: + job: the Job to wait for and add data from. + + Returns: + A tuple (str, bool) of the job id and bool of if the job data was added. + + Raises: + Exception: If an error occured when adding job data. + """ + jid = job.job_id() + try: + job_result = job.result() + self._add_result_data(job_result) + LOG.debug("Job data added [Job ID: %s]", jid) + return jid, True + except Exception as ex: # pylint: disable=broad-except + # Handle cancelled jobs + status = job.status() + if status == JobStatus.CANCELLED: + LOG.warning("Job was cancelled before completion [Job ID: %s]", jid) + return jid, False + if status == JobStatus.ERROR: + LOG.error( + "Job data not added for errorred job [Job ID: %s]" "\nError message: %s", + jid, + job.error_message(), + ) + return jid, False + LOG.warning("Addind data from job failed [Job ID: %s]", job.job_id()) + raise ex + def add_analysis_callback(self, callback: Callable, **kwargs: Any): """Add analysis callback for running after experiment data jobs are finished. @@ -288,80 +431,69 @@ def add_analysis_callback(self, callback: Callable, **kwargs: Any): keywork arguments passed to this method. **kwargs: Keyword arguments to be passed to the callback function. """ - callback_id = uuid.uuid4() - self._callback_statuses[callback_id] = CallbackStatus(callback, kwargs=kwargs) - - # Wrap callback function to handle reporting status and catching - # any exceptions and their error messages - def _wrapped_callback(): - try: - self._callback_statuses[callback_id].status = JobStatus.RUNNING - callback(self, **kwargs) - self._callback_statuses[callback_id].status = JobStatus.DONE - except Exception as ex: # pylint: disable=broad-except - self._callback_statuses[callback_id].status = JobStatus.ERROR - error_msg = f"Analysis callback {callback} failed: \n" "".join( - traceback.format_exception(type(ex), ex, ex.__traceback__) - ) - self._callback_statuses[callback_id].error_msg = error_msg - LOG.warning("Analysis callback %s failed:\n%s", callback, traceback.format_exc()) - self._callback_statuses[callback_id].event.set() - - with self._job_futures.lock: - # Determine if a future is running that we need to add callback to - fut_done = True - if self._job_futures: - _, fut = self._job_futures[-1] - fut_done = fut.done() - if fut_done and self._callback_future is not None: - fut = self._callback_future - fut_done = fut.done() - if fut_done: - fut = None - - if fut_done: - # Submit future so analysis can run async even if there are no - # running jobs or running analysis. - self._callback_future = self._executor.submit(_wrapped_callback) - else: - # Wrap the wrapped function for the format expected by Python - # Future.add_done_callback - def _done_callback(fut): - if fut.cancelled(): - self._callback_statuses[callback_id].status = JobStatus.CANCELLED - self._callback_statuses[callback_id].event.set() - else: - _wrapped_callback() - - fut.add_done_callback(_done_callback) - - def _add_jobs_data( - self, - jobs: List[Union[Job, BaseJob]], - timeout: Optional[float] = None, - ) -> None: - """Wait for a job to finish and add job result data. + with self._job_futures.lock and self._analysis_futures.lock: + # Create callback dataclass + cid = uuid.uuid4().hex + self._analysis_callbacks[cid] = AnalysisCallback( + name=callback.__name__, + callback_id=cid, + status=AnalysisStatus.QUEUED, + ) - Args: - jobs: Jobs to wait for. - timeout: Timeout waiting for job to finish. + # Get futures to wait for before running analysis + if self._analysis_futures: + # If analysis futures are present wait for all previous analysis + # futures to complete before running the latest one + futs = self._analysis_futures.values() + else: + # Create a dummy future to wait for jobs to finish. + # This allows subsequent analysis futures to be cancelled as they + # will be queued by the ThreadPoolExecutor while this future + # is still running + futs = [self._analysis_executor.submit(self._wait_for_jobs)] + + # Add run analysis future + self._analysis_futures[cid] = self._analysis_executor.submit( + self._run_analysis_callback, futs, cid, callback, **kwargs + ) - Raises: - Exception: If any of the jobs failed. - """ - for job in jobs: - LOG.debug("Waiting for job %s to finish.", job.job_id()) - try: - try: - job_result, timeout = combined_timeout(job.result, timeout) - except TypeError: # Not all jobs take timeout. - job_result = job.result() - with self._data.lock: - # Hold the lock so we add the block of results together. - self._add_result_data(job_result) - except Exception: # pylint: disable=broad-except - LOG.warning("Job %s failed:\n%s", job.job_id(), traceback.format_exc()) - raise + def _run_analysis_callback( + self, futs: List[futures.Future], callback_id: str, callback: Callable, **kwargs + ): + """Run an analysis callback after specified futures have finished.""" + if callback_id not in self._analysis_callbacks: + raise ValueError(f"No analysis callback with id {callback_id}") + + # Wait for previous futures to finish + if futs: + waited = futures.wait(futs) + # Check all previous futures finished, and cancel analysis callback if not + for fut in waited.done: + if fut.cancelled() or fut.exception() or not fut.result()[1]: + self._analysis_callbacks[callback_id].status = AnalysisStatus.CANCELLED + LOG.warning("Cancelled analysis callback [Analysis ID: %s]", callback_id) + return callback_id, False + + # Run callback function + try: + self._analysis_callbacks[callback_id].status = AnalysisStatus.RUNNING + LOG.debug( + "Running analysis callback '%s' [Analysis ID: %s]", + self._analysis_callbacks[callback_id].name, + callback_id, + ) + callback(self, **kwargs) + self._analysis_callbacks[callback_id].status = AnalysisStatus.DONE + LOG.debug("Analysis callback finished [Analysis ID: %s]", callback_id) + return callback_id, True + except Exception as ex: # pylint: disable=broad-except + self._analysis_callbacks[callback_id].status = AnalysisStatus.ERROR + error_msg = f"Analysis callback failed [Analysis ID: {callback_id}]:\n" "".join( + traceback.format_exception(type(ex), ex, ex.__traceback__) + ) + self._analysis_callbacks[callback_id].error_msg = error_msg + LOG.warning(error_msg) + return callback_id, False def _add_result_data(self, result: Result) -> None: """Add data from a Result object @@ -371,47 +503,50 @@ def _add_result_data(self, result: Result) -> None: """ if result.job_id not in self._jobs: self._jobs[result.job_id] = None - for i in range(len(result.results)): - data = result.data(i) - data["job_id"] = result.job_id - if "counts" in data: - # Format to Counts object rather than hex dict - data["counts"] = result.get_counts(i) - expr_result = result.results[i] - if hasattr(expr_result, "header") and hasattr(expr_result.header, "metadata"): - data["metadata"] = expr_result.header.metadata - data["shots"] = expr_result.shots - data["meas_level"] = expr_result.meas_level - if hasattr(expr_result, "meas_return"): - data["meas_return"] = expr_result.meas_return - self._add_single_data(data) - - def _add_single_data(self, data: Dict[str, any]) -> None: - """Add a single data dictionary to the experiment. - - Args: - data: Data to be added. - """ - self._data.append(data) + with self._data.lock: + # Lock data while adding all result data + for i, _ in enumerate(result.results): + data = result.data(i) + data["job_id"] = result.job_id + if "counts" in data: + # Format to Counts object rather than hex dict + data["counts"] = result.get_counts(i) + expr_result = result.results[i] + if hasattr(expr_result, "header") and hasattr(expr_result.header, "metadata"): + data["metadata"] = expr_result.header.metadata + data["shots"] = expr_result.shots + data["meas_level"] = expr_result.meas_level + if hasattr(expr_result, "meas_return"): + data["meas_return"] = expr_result.meas_return + self._data.append(data) def _retrieve_data(self): """Retrieve job data if missing experiment data.""" + if self._data or not self._backend: + return # Get job results if missing experiment data. - if (not self._data) and self._backend: - with self._jobs.lock: - for jid, job in self._jobs.items(): - if job is None: - try: - job = self._backend.retrieve_job(jid) - self._jobs[jid] = job - except Exception: # pylint: disable=broad-except - LOG.warning( - "Unable to retrive data from job %s on backend %s", - jid, - self._backend, - ) - if job is not None: - self._add_result_data(job.result()) + retrieved_jobs = {} + for jid, job in self._jobs.items(): + if job is None: + try: + LOG.debug("Retrieving job from backend %s [Job ID: %s]", self._backend, jid) + job = self._backend.retrieve_job(jid) + retrieved_jobs[jid] = job + except Exception: # pylint: disable=broad-except + LOG.warning( + "Unable to retrive data from job on backend %s [Job ID: %s]", + self._backend, + jid, + ) + # Add retrieved job objects to stored jobs and extract data + for jid, job in retrieved_jobs.items(): + self._jobs[jid] = job + if job.status() in JOB_FINAL_STATES: + # Add job results synchronously + self._add_job_data(job) + else: + # Add job results asynchronously + self._add_job_future(job) def data( self, @@ -709,7 +844,7 @@ def analysis_results( DbExperimentEntryNotFound: If the entry cannot be found. """ if block: - self._wait_for_callbacks(timeout=timeout) + self._wait_for_analysis(timeout=timeout) self._retrieve_analysis_results(refresh=refresh) if index is None: return self._analysis_results.values() @@ -779,13 +914,6 @@ def _save_experiment_metadata(self) -> None: LOG.warning("Experiment cannot be saved because backend is missing.") return - with self._job_futures.lock: - if any(not fut.done() for _, fut in self._job_futures) and not self.auto_save: - LOG.warning( - "Not all post-processing has finished. Consider calling " - "save() again after all post-processing is done to save any newly " - "generated data." - ) metadata = copy.deepcopy(self._metadata) metadata["_source"] = self._source @@ -862,8 +990,8 @@ def save(self) -> None: if self.verbose: print( - "You can view the experiment online at https://quantum-computing.ibm.com/experiments/" - + self.experiment_id + "You can view the experiment online at " + "https://quantum-computing.ibm.com/experiments/" + self.experiment_id ) @classmethod @@ -910,161 +1038,271 @@ def load(cls, experiment_id: str, service: DatabaseServiceV1) -> "DbExperimentDa expdata._created_in_db = True return expdata - def cancel_jobs(self) -> None: - """Cancel any running jobs.""" - for kwargs, fut in self._job_futures.copy(): - if not fut.done(): - for job in kwargs["jobs"]: - if job.status() not in JOB_FINAL_STATES: - try: - job.cancel() - except Exception as err: # pylint: disable=broad-except - LOG.info("Unable to cancel job %s: %s", job.job_id(), err) + def jobs(self) -> List[Job]: + """Return a list of jobs for the experiment""" + return self._jobs.values() + + def cancel_jobs(self, ids: Optional[List[str]] = None) -> bool: + """Cancel any running jobs. + + Args: + ids: Jobs to cancel. If None all non-finished jobs will be cancelled. + + Returns: + True if the specified jobs were successfully cancelled + otherwise false. + """ + cancelled = True + + with self._jobs.lock: + if ids is None: + jobs = self._jobs.values() + else: + jobs = [self._jobs[jid] for jid in ids] + + for job in jobs: + if job and job.status() not in JOB_FINAL_STATES: + try: + job.cancel() + LOG.debug("Cancelled job [Job ID: %s]", job.job_id()) + except Exception as err: # pylint: disable=broad-except + cancelled = False + LOG.warning("Unable to cancel job [Job ID: %s]:\n%s", job.job_id(), err) + return cancelled + + def cancel_analysis(self, ids: Optional[List[str]] = None) -> bool: + """Cancel any queued analysis callbacks. + + .. note:: + A currently running analysis callback cannot be cancelled. + + Args: + ids: Analysis callbacks to cancel. If None all non-finished + analysis will be cancelled. + + Returns: + True if the specified analysis callbacks were successfully + cancelled otherwise false. + """ + cancelled = True + with self._analysis_futures.lock: + if ids is None: + ids = self._analysis_futures.keys() + futs = self._analysis_futures.values() + else: + futs = [self._analysis_futures[i] for i in ids] + for cid, fut in zip(ids, futs): + if fut.done(): + continue + if fut.cancel(): + self._analysis_callbacks[cid].status = AnalysisStatus.CANCELLED + LOG.info("Cancelled analysis callback [Analysis ID: %s]", cid) + else: + cancelled = False + LOG.warning("Unable to cancel analysis callback [Analysis ID: %s]", cid) + return cancelled + + def cancel(self) -> bool: + """Attempt to cancel any running jobs and queued analysis callbacks. + + .. note:: + A running analysis callback cannot be cancelled. + + Returns: + True if all jobs and analysis are successfully cancelled, otherwise false. + """ + return self.cancel_jobs() and self.cancel_analysis() def block_for_results(self, timeout: Optional[float] = None) -> "DbExperimentDataV1": """Block until all pending jobs and analysis callbacks finish. Args: - timeout: Timeout waiting for results. + timeout: Timeout in seconds for waiting for results. Returns: The experiment data with finished jobs and post-processing. """ - _, timeout = combined_timeout(self._wait_for_jobs, timeout) - _, timeout = combined_timeout(self._wait_for_callbacks, timeout) + if self._analysis_futures: + self._wait_for_analysis(timeout) + else: + self._wait_for_jobs(timeout) + self._removed_done_futures() return self def _wait_for_jobs(self, timeout: Optional[float] = None): """Wait for jobs to finish running""" - # Wait for jobs to finish - for kwargs, fut in self._job_futures.copy(): - jobs = [job.job_id() for job in kwargs["jobs"]] - LOG.info("Waiting for data job %s to finish.", jobs) - try: - _, timeout = combined_timeout(fut.result, timeout) - except futures.TimeoutError: - LOG.warning( - "Possibly incomplete experiment data: Retrieving a job's result timed out." - ) - except Exception: # pylint: disable = broad-except - LOG.warning( - "Possibly incomplete experiment data: Retrieving a job's result" - " raised an exception." + waited = futures.wait(self._job_futures.values(), timeout=timeout) + value = True + # Log jobs still running after timeout + if waited.not_done: + LOG.info( + "Waiting for jobs timed out before all jobs completed [Experiment ID: %s].", + self.experiment_id, + ) + value = False + + # Check for jobs that were cancelled or errorred + excepts = "" + for fut in waited.done: + ex = fut.exception() + if ex: + excepts += "\n".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) + value = False + elif fut.cancelled(): + LOG.debug( + "Job futures were cancelled before completion [Experiment ID: %s]", + self.experiment_id, ) - # Check job status and show warning if cancelled or error - jobs_status = self._job_status() - if jobs_status == "CANCELLED": - LOG.warning("Possibly incomplete experiment data: a Job was cancelled.") - elif jobs_status == "ERROR": - LOG.warning("Possibly incomplete experiment data: A Job returned an error.") - - def _wait_for_callbacks(self, timeout: Optional[float] = None): + value = False + if excepts: + LOG.error( + "Job futures raised exceptions [Experiment ID: %s]:%s", self.experiment_id, excepts + ) + return "_wait_for_jobs", value + + def _wait_for_analysis(self, timeout: Optional[float] = None): """Wait for analysis callbacks to finish""" - # Wait for analysis callbacks to finish - if self._callback_statuses: - for status in self._callback_statuses.values(): - if status.status in [JobStatus.DONE, JobStatus.CANCELLED]: - continue - LOG.info("Waiting for analysis callback %s to finish.", status.callback) - finished, timeout = combined_timeout(status.event.wait, timeout) - if not finished: - LOG.warning( - "Possibly incomplete analysis results:" - " analysis" - " callback %s timed out.", - status.callback, - ) + waited = futures.wait(self._analysis_futures.values(), timeout=timeout) + # Log jobs still running after timeout + if waited.not_done: + LOG.info( + "Waiting for analysis callbacks timed out before all " + "callbacks completed [Experiment ID: %s].", + self.experiment_id, + ) - # Check analysis status and show warning if cancelled or error - callback_status = self._callback_status() - if callback_status == "CANCELLED": - LOG.warning("Possibly incomplete analysis results: an analysis callback was cancelled.") - elif callback_status == "ERROR": - LOG.warning( - "Possibly incomplete analysis results: an analysis callback raised an error." + # Check for jobs that were cancelled or errorred + excepts = "" + for fut in waited.done: + ex = fut.exception() + if ex: + excepts += "\n".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) + elif fut.cancelled(): + LOG.debug( + "Analysis callbacks were cancelled before completion [Experiment ID: %s]", + self.experiment_id, + ) + if excepts: + LOG.error( + "Analysis callbacks raised exceptions [Experiment ID: %s]:%s", + self.experiment_id, + excepts, ) - def status(self) -> str: - """Return the data processing status. + def _removed_done_futures(self): + """Remove futures that have finished""" + + def _keep_future(fut): + return not fut.done() or fut.cancelled() or fut.exception() + + with self._analysis_futures.lock and self._job_futures.lock: + not_done_analysis = [ + (cid, fut) for cid, fut in self._analysis_futures.items() if _keep_future(fut) + ] + self._analysis_futures = ThreadSafeOrderedDict(not_done_analysis) + + not_done_jobs = [ + (jid, fut) for jid, fut in self._job_futures.items() if _keep_future(fut) + ] + self._job_futures = ThreadSafeOrderedDict(not_done_jobs) + + def status(self) -> ExperimentStatus: + """Return the experiment status. - If the experiment consists of multiple jobs, the returned status is mapped - in the following order: + Possible return values for :class:`.ExperimentStatus` are - * ERROR - if any job incurred an error. - * CANCELLED - if any job is cancelled. - * RUNNING - if any job is still running. - * QUEUED - if any job is queued. - * VALIDATING - if any job is being validated. - * INITIALIZING - if any job is being initialized. - * POST_PROCESSING - if any analysis callbacks are still running - * DONE - if all jobs and analysis callbacks are finished. + * :attr:`~.ExperimentStatus.EMPTY` - experiment data is empty + * :attr:`~.ExperimentStatus.INITIALIZING` - experiment jobs are being initialized + * :attr:`~.ExperimentStatus.QUEUED` - experiment jobs are queued + * :attr:`~.ExperimentStatus.RUNNING` - experiment jobs is actively running + * :attr:`~.ExperimentStatus.CANCELLED` - experiment jobs or analysis has been cancelled + * :attr:`~.ExperimentStatus.POST_PROCESSING` - experiment analysis is actively running + * :attr:`~.ExperimentStatus.DONE` - experiment jobs and analysis have successfully run + * :attr:`~.ExperimentStatus.ERROR` - experiment jobs or analysis incurred an error .. note:: - If an experiment has status ERROR or CANCELLED there may still - be pending or running jobs. In these cases it may be beneficial - to call :meth:`cancel_jobs` to terminate these remaining jobs. + If an experiment has status :attr:`~.ExperimentStatus.ERROR` + there may still be pending or running jobs. In these cases it + may be beneficial to call :meth:`cancel_jobs` to terminate these + remaining jobs. Returns: - Data processing status. + The experiment status. """ if all( len(container) == 0 for container in [ self._data, self._jobs, - self._callback_statuses, + self._job_futures, + self._analysis_callbacks, + self._analysis_futures, self._figures, self._analysis_results, ] ): - return "INITIALIZING" + return ExperimentStatus.EMPTY - job_status = self._job_status() - if job_status != "DONE": - return job_status - - callback_status = self._callback_status() - if callback_status in ["QUEUED", "RUNNING"]: - return "POST_PROCESSING" + # Return job status is job is not DONE + try: + return { + JobStatus.INITIALIZING: ExperimentStatus.INITIALIZING, + JobStatus.QUEUED: ExperimentStatus.QUEUED, + JobStatus.VALIDATING: ExperimentStatus.VALIDATING, + JobStatus.RUNNING: ExperimentStatus.RUNNING, + JobStatus.CANCELLED: ExperimentStatus.CANCELLED, + JobStatus.ERROR: ExperimentStatus.ERROR, + }[self.job_status()] + except KeyError: + pass - return callback_status + # Return analysis status if Done, cancelled or error + try: + return { + AnalysisStatus.DONE: ExperimentStatus.DONE, + AnalysisStatus.CANCELLED: ExperimentStatus.CANCELLED, + AnalysisStatus.ERROR: ExperimentStatus.ERROR, + }[self.analysis_status()] + except KeyError: + return ExperimentStatus.POST_PROCESSING - def _job_status(self) -> str: + def job_status(self) -> JobStatus: """Return the experiment job execution status. - If the experiment consists of multiple jobs, the returned status is mapped - in the following order: + Possible return values for :class:`.JobStatus` are - * ERROR - if any job incurred an error. - * CANCELLED - if any job is cancelled. - * RUNNING - if any job is still running. - * QUEUED - if any job is queued. - * VALIDATING - if any job is being validated. - * INITIALIZING - if any job is being initialized. - * DONE - if all jobs are finished. + * :attr:`~.JobStatus.ERROR` - if any job incurred an error + * :attr:`~.JobStatus.CANCELLED` - if any job is cancelled. + * :attr:`~.JobStatus.RUNNING` - if any job is still running. + * :attr:`~.JobStatus.QUEUED` - if any job is queued. + * :attr:`~.JobStatus.VALIDATING` - if any job is being validated. + * :attr:`~.JobStatus.INITIALIZING` - if any job is being initialized. + * :attr:`~.JobStatus.DONE` - if all jobs are finished. + + .. note:: + + If an experiment has status :attr:`~.JobStatus.ERROR` or + :attr:`~.JobStatus.CANCELLED` there may still be pending or + running jobs. In these cases it may be beneficial to call + :meth:`cancel_jobs` to terminate these remaining jobs. Returns: - Job execution status. + The job execution status. """ - # Backend jobs statuses = set() - with self._job_futures.lock: - for idx, (kwargs, fut) in enumerate(self._job_futures): - all_jobs_done = True - for job in kwargs["jobs"]: - job_status = job.status() - statuses.add(job_status) - if job_status != JobStatus.DONE: - all_jobs_done = False - if fut.done(): - if fut.exception(): - statuses.add(JobStatus.ERROR) - elif all_jobs_done: - # If all jobs ran successfully we can remove the future - self._job_futures[idx] = None - self._job_futures = ThreadSafeList(list(filter(None, self._job_futures))) + with self._jobs.lock: + + # No jobs present + if not self._jobs: + return JobStatus.DONE + + statuses = set() + for job in self._jobs.values(): + if job: + statuses.add(job.status()) + # If any jobs are in non-DONE state return that state for stat in [ JobStatus.ERROR, JobStatus.CANCELLED, @@ -1074,77 +1312,92 @@ def _job_status(self) -> str: JobStatus.INITIALIZING, ]: if stat in statuses: - return stat.name + return stat - return "DONE" + return JobStatus.DONE - def _callback_status(self) -> str: - """Return the data analysis callback post-processing status. + def analysis_status(self) -> AnalysisStatus: + """Return the data analysis post-processing status. - If the experiment consists of multiple analysis callbacks, the returned - status is mapped in the following order: + Possible return values for :class:`.AnalysisStatus` are - * ERROR - if any analysis callback incurred an error. - * CANCELLED - if any analysis callback is cancelled. - * RUNNING - if any analysis callbacks are still running. - * QUEUED - if any analysis callback is queued. - * DONE - if all analysis callbacks are finished. + * :attr:`~.AnalysisStatus.ERROR` - if any analysis callback incurred an error + * :attr:`~.AnalysisStatus.CANCELLED` - if any analysis callback is cancelled. + * :attr:`~.AnalysisStatus.RUNNING` - if any analysis callback is actively running. + * :attr:`~.AnalysisStatus.QUEUED` - if any analysis callback is queued. + * :attr:`~.AnalysisStatus.DONE` - if all analysis callbacks have successfully run. Returns: - Analysis callback status. + Then analysis status. """ statuses = set() - for status in self._callback_statuses.values(): + for status in self._analysis_callbacks.values(): statuses.add(status.status) - # Remove analysis future if it is done, since we store all statuses - # In the _callback_status field. - if self._callback_future is not None and self._callback_future.done(): - self._callback_future = None - for stat in [ - JobStatus.ERROR, - JobStatus.CANCELLED, - JobStatus.RUNNING, - JobStatus.QUEUED, + AnalysisStatus.ERROR, + AnalysisStatus.CANCELLED, + AnalysisStatus.RUNNING, + AnalysisStatus.QUEUED, ]: if stat in statuses: - return stat.name + return stat - return "DONE" + return AnalysisStatus.DONE - def errors(self) -> str: - """Return errors encountered. - - Returns: - Experiment errors. - """ + def job_errors(self) -> str: + """Return any errors encountered in job execution.""" errors = [] - # Get any future errors - for fut_kwargs, fut in self._job_futures: - if fut.done(): - ex = fut.exception() - if ex: - jobs = [job.job_id() for job in fut_kwargs["jobs"]] - errors.append( - f"Job {jobs} failed: \n" - + "".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) - ) # Get any job errors for job in self._jobs.values(): if job and job.status() == JobStatus.ERROR: - job_err = "." if hasattr(job, "error_message"): - job_err = ": " + job.error_message() - errors.append(f"Job {job.job_id()} failed{job_err}") + error_msg = job.error_message() + else: + error_msg = "" + errors.append(f"\n[Job ID: {job.job_id()}]: {error_msg}") + + # Get any job futures errors: + for jid, fut in self._job_futures.items(): + if fut and fut.exception(): + ex = fut.exception() + errors.append( + f"[Job ID: {jid}]" + "\n".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) + ) + return "".join(errors) + + def analysis_errors(self) -> str: + """Return any errors encountered during analysis callbacks.""" + errors = [] + + # Get any callback errors + for cid, callback in self._analysis_callbacks.items(): + if callback.status == AnalysisStatus.ERROR: + errors.append(f"\n[Analysis ID: {cid}]: {callback.error_msg}") + + # Get any callback futures errors: + for cid, fut in self._analysis_futures.items(): + ex = fut.exception() + if fut.exception(): + errors.append( + f"[Analysis ID: {cid}]" + "\n".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) + ) + return "".join(errors) - # Get any analysis callback errors - for status in self._callback_statuses.values(): - if status.error_msg is not None: - errors.append(status.error_msg) + def errors(self) -> str: + """Return errors encountered during job and analysis execution. + + .. note:: + To display only job or analysis errors use the + :meth:`job_errors` or :meth:`analysis_errors` methods. - return "\n".join(errors) + Returns: + Experiment errors. + """ + return self.job_errors() + self.analysis_errors() def copy(self, copy_results: bool = True) -> "DbExperimentDataV1": """Make a copy of the experiment data with a new experiment ID. @@ -1164,6 +1417,11 @@ def copy(self, copy_results: bool = True) -> "DbExperimentDataV1": new result IDs and figure names generated for the copies. """ new_instance = self.__class__() + LOG.debug( + "Copying experiment data [Experiment ID: %s]: %s", + self.experiment_id, + new_instance.experiment_id, + ) # Copy basic properties and metadata new_instance._type = self.experiment_type @@ -1180,21 +1438,9 @@ def copy(self, copy_results: bool = True) -> "DbExperimentDataV1": # Copy circuit result data and jobs with self._data.lock: # Hold the lock so no new data can be added. new_instance._data = self._data.copy_object() - for orig_kwargs, fut in self._job_futures.copy(): - if fut.done(): - continue - # We cannot deep copy orig_kwargs because it contains a Job which - # inherits an abstract class. - extra_kwargs = {} - for key, val in orig_kwargs.items(): - if key not in ["jobs", "timeout"]: - extra_kwargs[key] = val - - new_instance.add_data( - data=orig_kwargs["jobs"], - timeout=orig_kwargs["timeout"], - **extra_kwargs, - ) + for jid, fut in self._job_futures.items(): + if not fut.done(): + new_instance._add_job_future(new_instance._jobs[jid]) # If not copying results return the object if not copy_results: @@ -1202,7 +1448,7 @@ def copy(self, copy_results: bool = True) -> "DbExperimentDataV1": # Copy results and figures. # This requires analysis callbacks to finish - self._wait_for_callbacks() + self._wait_for_analysis() with self._analysis_results.lock: new_instance._analysis_results = ThreadSafeOrderedDict() new_instance.add_analysis_results([result.copy() for result in self.analysis_results()]) diff --git a/qiskit_experiments/database_service/utils.py b/qiskit_experiments/database_service/utils.py index 412f32b2b6..dc539db616 100644 --- a/qiskit_experiments/database_service/utils.py +++ b/qiskit_experiments/database_service/utils.py @@ -14,7 +14,6 @@ import io import logging -import time import threading import traceback from abc import ABC, abstractmethod @@ -106,27 +105,6 @@ def plot_to_svg_bytes(figure: "pyplot.Figure") -> bytes: return figure_data -def combined_timeout( - func: Callable, timeout: Optional[float] = None -) -> Tuple[Any, Union[float, None]]: - """Call func(timeout) and return reduced timeout for subsequent funcs. - - Args: - func: A function with signature func(timeout). - timeout: The time to wait for function call. - - Returns: - A pair of the function return and the updated timeout variable - for remaining time left to wait for other functions. - """ - time_start = time.time() - ret = func(timeout) - time_stop = time.time() - if timeout is not None: - timeout = max(0, timeout + time_start - time_stop) - return ret, timeout - - def save_data( is_new: bool, new_func: Callable, diff --git a/qiskit_experiments/framework/__init__.py b/qiskit_experiments/framework/__init__.py index baace9f39e..656c815f85 100644 --- a/qiskit_experiments/framework/__init__.py +++ b/qiskit_experiments/framework/__init__.py @@ -203,6 +203,9 @@ :toctree: ../stubs/ ExperimentData + ExperimentStatus + JobStatus + AnalysisStatus FitVal AnalysisResultData ExperimentConfig @@ -235,6 +238,11 @@ from qiskit.providers.options import Options from qiskit_experiments.database_service.db_analysis_result import DbAnalysisResultV1 from qiskit_experiments.database_service.db_fitval import FitVal +from qiskit_experiments.database_service.db_experiment_data import ( + ExperimentStatus, + JobStatus, + AnalysisStatus, +) from .base_analysis import BaseAnalysis from .base_experiment import BaseExperiment from .configs import ExperimentConfig, AnalysisConfig diff --git a/qiskit_experiments/framework/base_experiment.py b/qiskit_experiments/framework/base_experiment.py index 7653690b30..f0285c6917 100644 --- a/qiskit_experiments/framework/base_experiment.py +++ b/qiskit_experiments/framework/base_experiment.py @@ -46,6 +46,7 @@ def __init__( Args: qubits: list of physical qubits for the experiment. + analysis: Optional, the analysis to use for the experiment. backend: Optional, the backend to run the experiment on. experiment_type: Optional, the experiment type string. @@ -199,6 +200,7 @@ def run( self, backend: Optional[Backend] = None, analysis: Optional[Union[BaseAnalysis, None]] = "default", + timeout: Optional[float] = None, **run_options, ) -> ExperimentData: """Run an experiment and perform analysis. @@ -211,6 +213,8 @@ def run( analysis. If None analysis will not be run. If ``"default"`` the experiments :meth:`analysis` instance will be used if it contains one. + timeout: Time to wait for experiment jobs to finish running before + cancelling. run_options: backend runtime options used for circuit execution. Returns: @@ -270,7 +274,7 @@ def run( # Run jobs jobs = experiment._run_jobs(circuits, **run_opts) - experiment_data.add_data(jobs) + experiment_data.add_jobs(jobs, timeout=timeout) experiment._add_job_metadata(experiment_data.metadata, jobs, **run_opts) # Optionally run analysis diff --git a/releasenotes/notes/expdata-futures-87a2ff561375e22b.yaml b/releasenotes/notes/expdata-futures-87a2ff561375e22b.yaml new file mode 100644 index 0000000000..fbc9ed80cd --- /dev/null +++ b/releasenotes/notes/expdata-futures-87a2ff561375e22b.yaml @@ -0,0 +1,50 @@ +--- +features: + - | + Improves handling of job and analysis processes in :meth:`.ExperimentData`. + Verbose logging information on execution of analysis callbacks in an + experiment can enabled by setting the ``qiskit_experiments`` log level + to DEBUG. + - | + Adds :meth:`.ExperimentData.jobs` method for returning a list of + Qiskit Jobs for a running or finished experiment. + - | + Adds :meth:`.ExperimentData.job_status` method for returning the status + of Qiskit Job execution for an experiment. This returns a + :class:`.JobStatus` enum class value. + - | + Adds :meth:`.ExperimentData.analysis_status` method for returning the status + of analysis callbacks for an experiment. This returns a + :class:`.AnalysisStatus` enum class value. + - | + Adds :meth:`.ExperimentData.cancel_analysis` method to allow cancelling + pending analysis callbacks. Note that analysis callbacks that have already + started running cannot be cancelled. + - | + Adds :meth:`.ExperimentData.cancel` to cancel both jobs and analysis. + - | + Adds :meth:`.ExperimentData.add_jobs` method for adding one or more Qiskit + jobs to experiment data. This method takes an optional ``timeout`` kwarg that + when used will automatically cancel all non-finished jobs that exceed the + alloted time. + - | + Added enum classes for experiment, job, and analysis status +upgrade: + - | + The value returned by :meth:`.ExperimentData.status` has been changed from + a string to a :class:`.ExperimentStatus` enum class value. +deprecations: + - | + Adding data from jobs using :meth:`.ExperimentData.add_data` has been + deprecated. This method should now only be used to add data from Qiskit + :class:`.Result` objects or raw data dicts. + Job data should now be added using the new :meth:`.ExperimentData.add_jobs` + method instead. + - | + The ``timeout`` kwarg of :meth:`.ExperimentData.add_data` has been deprecated. + Timeout for adding jobs is now handled by the :meth:`.ExperimentData.add_jobs` + method. +fixes: + - | + Fixes an issue with :meth:`.ExperimentData.block_for_results` sometimes + having a race issue with all analysis callbacks finishing. diff --git a/test/database_service/test_db_experiment_data.py b/test/database_service/test_db_experiment_data.py index 353dde6935..0372becc4b 100644 --- a/test/database_service/test_db_experiment_data.py +++ b/test/database_service/test_db_experiment_data.py @@ -39,6 +39,10 @@ DbExperimentEntryNotFound, DbExperimentEntryExists, ) +from qiskit_experiments.database_service.db_experiment_data import ( + AnalysisStatus, + ExperimentStatus, +) class TestDbExperimentData(QiskitExperimentsTestCase): @@ -116,6 +120,7 @@ def test_add_data_job(self): for _ in range(2): job = mock.create_autospec(Job, instance=True) job.result.return_value = self._get_job_result(2) + job.status.return_value = JobStatus.DONE jobs.append(job) expected = a_job.result().get_counts() @@ -123,9 +128,9 @@ def test_add_data_job(self): expected.extend(job.result().get_counts()) exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") - exp_data.add_data(a_job) + exp_data.add_jobs(a_job) exp_data.block_for_results() - exp_data.add_data(jobs) + exp_data.add_jobs(jobs) exp_data.block_for_results() self.assertEqual(expected, [sdata["counts"] for sdata in exp_data.data()]) self.assertIn(a_job.job_id(), exp_data.job_ids) @@ -145,10 +150,11 @@ def _callback(_exp_data): a_job = mock.create_autospec(Job, instance=True) a_job.result.return_value = self._get_job_result(2) + a_job.status.return_value = JobStatus.DONE called_back = False exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") - exp_data.add_data(a_job) + exp_data.add_jobs(a_job) exp_data.add_analysis_callback(_callback) exp_data.block_for_results() self.assertTrue(called_back) @@ -198,11 +204,12 @@ def _callback(_exp_data, **kwargs): a_job = mock.create_autospec(Job, instance=True) a_job.result.return_value = self._get_job_result(2) + a_job.status.return_value = JobStatus.DONE called_back = False callback_kwargs = "foo" exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") - exp_data.add_data(a_job) + exp_data.add_jobs(a_job) exp_data.add_analysis_callback(_callback, foo=callback_kwargs) exp_data.block_for_results() self.assertTrue(called_back) @@ -215,13 +222,14 @@ def _callback(_exp_data, **kwargs): a_job = mock.create_autospec(Job, instance=True) a_job.result.return_value = self._get_job_result(2) + a_job.status.return_value = JobStatus.DONE event = threading.Event() self.addCleanup(event.set) exp_data = DbExperimentData(experiment_type="qiskit_test") exp_data.add_analysis_callback(_callback, event=event) - exp_data.add_data(a_job) + exp_data.add_jobs(a_job) with self.assertLogs("qiskit_experiments", "WARNING"): exp_data.add_data({"foo": "bar"}) @@ -366,7 +374,7 @@ def test_delayed_backend(self): self.assertIsNone(exp_data.service) exp_data.save_metadata() a_job = mock.create_autospec(Job, instance=True) - exp_data.add_data(a_job) + exp_data.add_jobs(a_job) self.assertIsNotNone(exp_data.backend) self.assertIsNotNone(exp_data.service) @@ -376,7 +384,7 @@ def test_different_backend(self): a_job = mock.create_autospec(Job, instance=True) self.assertNotEqual(exp_data.backend, a_job.backend()) with self.assertLogs("qiskit_experiments", "WARNING"): - exp_data.add_data(a_job) + exp_data.add_jobs(a_job) def test_add_get_analysis_result(self): """Test adding and getting analysis results.""" @@ -473,9 +481,10 @@ def test_set_service_job(self): mock_service = self._set_mock_service() job = mock.create_autospec(Job, instance=True) job.backend.return_value = self.backend + job.status.return_value = JobStatus.DONE exp_data = DbExperimentData(experiment_type="qiskit_test") self.assertIsNone(exp_data.service) - exp_data.add_data(job) + exp_data.add_jobs(job) self.assertEqual(mock_service, exp_data.service) def test_set_service_direct(self): @@ -499,7 +508,8 @@ def test_new_backend_has_service(self): new_service = self._set_mock_service() self.assertNotEqual(orig_service, new_service) job.backend.return_value = self.backend - exp_data.add_data(job) + job.status.return_value = JobStatus.DONE + exp_data.add_jobs(job) self.assertEqual(orig_service, exp_data.service) def test_auto_save(self): @@ -536,14 +546,16 @@ def test_status_job_pending(self): event = threading.Event() job2 = mock.create_autospec(Job, instance=True) job2.result = lambda *args, **kwargs: event.wait(timeout=15) - job2.status.return_value = JobStatus.RUNNING + job2.status = lambda: JobStatus.CANCELLED if event.is_set() else JobStatus.RUNNING self.addCleanup(event.set) exp_data = DbExperimentData(experiment_type="qiskit_test") - exp_data.add_data(job1) - exp_data.add_data(job2) + exp_data.add_jobs(job1) + exp_data.add_jobs(job2) exp_data.add_analysis_callback(lambda *args, **kwargs: event.wait(timeout=15)) - self.assertEqual("RUNNING", exp_data.status()) + self.assertEqual(ExperimentStatus.RUNNING, exp_data.status()) + self.assertEqual(JobStatus.RUNNING, exp_data.job_status()) + self.assertEqual(AnalysisStatus.QUEUED, exp_data.analysis_status()) # Cleanup with self.assertLogs("qiskit_experiments", "WARNING"): @@ -561,23 +573,42 @@ def test_status_job_error(self): exp_data = DbExperimentData(experiment_type="qiskit_test") with self.assertLogs(logger="qiskit_experiments.database_service", level="WARN") as cm: - exp_data.add_data([job1, job2]) + exp_data.add_jobs([job1, job2]) self.assertIn("Adding a job from a backend", ",".join(cm.output)) - self.assertEqual("ERROR", exp_data.status()) + self.assertEqual(ExperimentStatus.ERROR, exp_data.status()) def test_status_post_processing(self): """Test experiment status during post processing.""" job = mock.create_autospec(Job, instance=True) job.result.return_value = self._get_job_result(3) + job.status.return_value = JobStatus.DONE event = threading.Event() self.addCleanup(event.set) exp_data = DbExperimentData(experiment_type="qiskit_test") - exp_data.add_data(job) + exp_data.add_jobs(job) exp_data.add_analysis_callback((lambda *args, **kwargs: event.wait(timeout=15))) status = exp_data.status() - self.assertEqual("POST_PROCESSING", status) + self.assertEqual(ExperimentStatus.POST_PROCESSING, status) + + def test_status_cancelled_analysis(self): + """Test experiment status during post processing.""" + job = mock.create_autospec(Job, instance=True) + job.result.return_value = self._get_job_result(3) + job.status.return_value = JobStatus.DONE + + event = threading.Event() + self.addCleanup(event.set) + + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_jobs(job) + exp_data.add_analysis_callback((lambda *args, **kwargs: event.wait(timeout=2))) + # Add second callback because the first can't be cancelled once it has started + exp_data.add_analysis_callback((lambda *args, **kwargs: event.wait(timeout=20))) + exp_data.cancel_analysis() + status = exp_data.status() + self.assertEqual(ExperimentStatus.CANCELLED, status) def test_status_post_processing_error(self): """Test experiment status when post processing failed.""" @@ -587,26 +618,28 @@ def _post_processing(*args, **kwargs): job = mock.create_autospec(Job, instance=True) job.result.return_value = self._get_job_result(3) + job.status.return_value = JobStatus.DONE exp_data = DbExperimentData(experiment_type="qiskit_test") - exp_data.add_data(job) + exp_data.add_jobs(job) with self.assertLogs(logger="qiskit_experiments.database_service", level="WARN") as cm: - exp_data.add_data(job) + exp_data.add_jobs(job) exp_data.add_analysis_callback(_post_processing) exp_data.block_for_results() - self.assertEqual("ERROR", exp_data.status()) + self.assertEqual(ExperimentStatus.ERROR, exp_data.status()) self.assertIn("Kaboom!", ",".join(cm.output)) def test_status_done(self): """Test experiment status when all jobs are done.""" job = mock.create_autospec(Job, instance=True) job.result.return_value = self._get_job_result(3) + job.status.return_value = JobStatus.DONE exp_data = DbExperimentData(experiment_type="qiskit_test") - exp_data.add_data(job) - exp_data.add_data(job) + exp_data.add_jobs(job) + exp_data.add_jobs(job) exp_data.add_analysis_callback(lambda *args, **kwargs: time.sleep(1)) exp_data.block_for_results() - self.assertEqual("DONE", exp_data.status()) + self.assertEqual(ExperimentStatus.DONE, exp_data.status()) def test_set_tags(self): """Test updating experiment tags.""" @@ -627,7 +660,7 @@ def _job_result(): self.addCleanup(event.set) job = mock.create_autospec(Job, instance=True) job.result = _job_result - exp_data.add_data(job) + exp_data.add_jobs(job) exp_data.cancel_jobs() job.cancel.assert_called_once() @@ -636,6 +669,93 @@ def _job_result(): event.set() exp_data.block_for_results() + def test_cancel_analysis(self): + """Test canceling experiment analysis.""" + + event = threading.Event() + self.addCleanup(event.set) + + def _job_result(): + event.wait(timeout=15) + return self._get_job_result(1) + + def _analysis(*args): # pylint: disable = unused-argument + event.wait(timeout=15) + + job = mock.create_autospec(Job, instance=True) + job.job_id.return_value = "1234" + job.result = _job_result + job.status = lambda: JobStatus.DONE if event.is_set() else JobStatus.RUNNING + + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_jobs(job) + exp_data.add_analysis_callback(_analysis) + exp_data.cancel_analysis() + + # Test status while job still running + self.assertEqual(exp_data.job_status(), JobStatus.RUNNING) + self.assertEqual(exp_data.analysis_status(), AnalysisStatus.CANCELLED) + self.assertEqual(exp_data.status(), ExperimentStatus.RUNNING) + + # Test status after job finishes + event.set() + self.assertEqual(exp_data.job_status(), JobStatus.DONE) + self.assertEqual(exp_data.analysis_status(), AnalysisStatus.CANCELLED) + self.assertEqual(exp_data.status(), ExperimentStatus.CANCELLED) + + def test_cancel(self): + """Test canceling experiment jobs and analysis.""" + + event = threading.Event() + self.addCleanup(event.set) + + def _job_result(): + event.wait(timeout=15) + raise ValueError("Job was cancelled.") + + def _analysis(*args): # pylint: disable = unused-argument + event.wait(timeout=15) + + job = mock.create_autospec(Job, instance=True) + job.job_id.return_value = "1234" + job.result = _job_result + job.cancel = event.set + job.status = lambda: JobStatus.CANCELLED if event.is_set() else JobStatus.RUNNING + + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_jobs(job) + exp_data.add_analysis_callback(_analysis) + exp_data.cancel() + + # Test status while job still running + self.assertEqual(exp_data.job_status(), JobStatus.CANCELLED) + self.assertEqual(exp_data.analysis_status(), AnalysisStatus.CANCELLED) + self.assertEqual(exp_data.status(), ExperimentStatus.CANCELLED) + + def test_add_jobs_timeout(self): + """Test timeout kwarg of add_jobs""" + + event = threading.Event() + self.addCleanup(event.set) + + def _job_result(): + event.wait(timeout=15) + raise ValueError("Job was cancelled.") + + job = mock.create_autospec(Job, instance=True) + job.job_id.return_value = "1234" + job.result = _job_result + job.cancel = event.set + job.status = lambda: JobStatus.CANCELLED if event.is_set() else JobStatus.RUNNING + + exp_data = DbExperimentData(experiment_type="qiskit_test") + exp_data.add_jobs(job, timeout=0.5) + + with self.assertLogs("qiskit_experiments", "WARNING"): + exp_data.block_for_results() + self.assertEqual(exp_data.job_status(), JobStatus.CANCELLED) + self.assertEqual(exp_data.status(), ExperimentStatus.CANCELLED) + def test_metadata_serialization(self): """Test experiment metadata serialization.""" metadata = {"complex": 2 + 3j, "numpy": np.zeros(2)} @@ -656,6 +776,7 @@ def _post_processing(*args, **kwargs): # pylint: disable=unused-argument job1 = mock.create_autospec(Job, instance=True) job1.job_id.return_value = "1234" + job1.status.return_value = JobStatus.DONE job2 = mock.create_autospec(Job, instance=True) job2.status.return_value = JobStatus.ERROR @@ -663,11 +784,11 @@ def _post_processing(*args, **kwargs): # pylint: disable=unused-argument exp_data = DbExperimentData(experiment_type="qiskit_test") with self.assertLogs(logger="qiskit_experiments.database_service", level="WARN") as cm: - exp_data.add_data(job1) + exp_data.add_jobs(job1) exp_data.add_analysis_callback(_post_processing) - exp_data.add_data(job2) + exp_data.add_jobs(job2) exp_data.block_for_results() - self.assertEqual("ERROR", exp_data.status()) + self.assertEqual(ExperimentStatus.ERROR, exp_data.status()) self.assertIn("Kaboom", ",".join(cm.output)) self.assertTrue(re.match(r".*5678.*Kaboom!", exp_data.errors(), re.DOTALL)) @@ -690,7 +811,7 @@ def _sleeper(*args, **kwargs): # pylint: disable=unused-argument job = mock.create_autospec(Job, instance=True) job.result = _sleeper exp_data = DbExperimentData(experiment_type="qiskit_test") - exp_data.add_data(job) + exp_data.add_jobs(job) exp_data.add_analysis_callback(_sleeper) exp_data.block_for_results() self.assertEqual(2, sleep_count) @@ -712,27 +833,28 @@ def test_copy_metadata(self): def test_copy_metadata_pending_job(self): """Test copy metadata with a pending job.""" + event = threading.Event() + self.addCleanup(event.set) + job_results1 = self._get_job_result(1) + job_results2 = self._get_job_result(1) def _job1_result(): event.wait(timeout=15) - return job_results[0] + return job_results1 def _job2_result(): event.wait(timeout=15) - return job_results[1] + return job_results2 exp_data = DbExperimentData(experiment_type="qiskit_test") - event = threading.Event() - self.addCleanup(event.set) - job_results = [self._get_job_result(1), self._get_job_result(1)] job = mock.create_autospec(Job, instance=True) job.result = _job1_result - exp_data.add_data(job) + exp_data.add_jobs(job) copied = exp_data.copy(copy_results=False) job2 = mock.create_autospec(Job, instance=True) job2.result = _job2_result - copied.add_data(job2) + copied.add_jobs(job2) event.set() exp_data.block_for_results() diff --git a/test/database_service/test_experiment_data_integration.py b/test/database_service/test_experiment_data_integration.py index 7446fb452c..ce2cd69ea4 100644 --- a/test/database_service/test_experiment_data_integration.py +++ b/test/database_service/test_experiment_data_integration.py @@ -99,7 +99,7 @@ def test_add_data_job(self): transpiled = transpile(ReferenceCircuits.bell(), self.backend) transpiled.metadata = {"foo": "bar"} job = self._run_circuit(transpiled) - exp_data.add_data(job) + exp_data.add_jobs(job) self.assertEqual([job.job_id()], exp_data.job_ids) result = job.result() exp_data.block_for_results() @@ -123,7 +123,7 @@ def test_new_experiment_data(self): job_ids = [] for _ in range(2): job = self._run_circuit() - exp_data.add_data(job) + exp_data.add_jobs(job) job_ids.append(job.job_id()) exp_data.save() @@ -142,7 +142,7 @@ def test_update_experiment_data(self): for _ in range(2): job = self._run_circuit() - exp_data.add_data(job) + exp_data.add_jobs(job) exp_data.tags = ["foo", "bar"] exp_data.share_level = "hub" exp_data.notes = "some notes" @@ -332,7 +332,7 @@ def test_set_service_job(self): """Test setting service with a job.""" exp_data = DbExperimentData(experiment_type="qiskit_test") job = self._run_circuit() - exp_data.add_data(job) + exp_data.add_jobs(job) exp_data.save() rexp = self.experiment.experiment(exp_data.experiment_id) @@ -466,7 +466,7 @@ def test_block_for_results(self): jobs = [] for _ in range(2): job = self._run_circuit() - exp_data.add_data(job) + exp_data.add_jobs(job) jobs.append(job) exp_data.block_for_results() self.assertTrue(all(job.status() == JobStatus.DONE for job in jobs)) diff --git a/test/test_composite.py b/test/test_composite.py index 09a7784829..50a0baef05 100644 --- a/test/test_composite.py +++ b/test/test_composite.py @@ -173,7 +173,7 @@ def test_nested_composite(self): nested_exp = BatchExperiment([exp5, exp3]) expdata = nested_exp.run(FakeBackend()).block_for_results() status = expdata.status() - self.assertEqual(status, "DONE") + self.assertEqual(status.name, "DONE") def test_analysis_replace_results_true(self): """