Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 123 additions & 116 deletions qiskit_experiments/database_service/db_experiment_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import logging
import dataclasses
import threading
import uuid
from typing import Optional, List, Any, Union, Callable, Dict
import copy
Expand Down Expand Up @@ -70,13 +69,13 @@ def service_exception_to_warning():


@dataclasses.dataclass
class CallbackStatus:
"""Dataclass for analysis callback status"""
class Callback:
"""Dataclass for analysis callback functions"""

callback: Callable
func: Callable
kwargs: Dict = dataclasses.field(default_factory=dict)
status: JobStatus = JobStatus.QUEUED
event: threading.Event = dataclasses.field(default_factory=threading.Event)
callback_id: str = ""
status: JobStatus = JobStatus.INITIALIZING
error_msg: Optional[str] = None


Expand Down Expand Up @@ -104,7 +103,7 @@ class DbExperimentDataV1(DbExperimentData):
version = 1
verbose = True # Whether to print messages to the standard output.
_metadata_version = 1
_executor = futures.ThreadPoolExecutor()
_job_executor = futures.ThreadPoolExecutor()
"""Threads used for asynchronous processing."""

_json_encoder = ExperimentEncoder
Expand Down Expand Up @@ -166,8 +165,9 @@ def __init__(

self._jobs = ThreadSafeOrderedDict(job_ids or [])
self._job_futures = ThreadSafeList()
self._callback_statuses = ThreadSafeOrderedDict()
self._callback_future = None
self._callback_executor = futures.ThreadPoolExecutor(max_workers=1)
self._callbacks = ThreadSafeOrderedDict()
self._callback_futures = ThreadSafeOrderedDict()

self._data = ThreadSafeList()
self._figures = ThreadSafeOrderedDict(figure_names or [])
Expand Down Expand Up @@ -220,7 +220,7 @@ def add_data(
Raises:
TypeError: If the input data type is invalid.
"""
if any(not status.event.is_set() for status in self._callback_statuses.values()):
if any(not future.done() for future in self._callback_futures.values()):
LOG.warning(
"Not all post-processing has finished. Adding new data "
"may create unexpected analysis results."
Expand Down Expand Up @@ -265,7 +265,7 @@ def add_data(
"timeout": timeout,
}
self._job_futures.append(
(job_kwargs, self._executor.submit(self._add_jobs_data, **job_kwargs))
(job_kwargs, self._job_executor.submit(self._add_jobs_data, **job_kwargs))
)

if self.auto_save:
Expand All @@ -288,52 +288,65 @@ def add_analysis_callback(self, callback: Callable, **kwargs: Any):
keywork arguments passed to this method.
**kwargs: Keyword arguments to be passed to the callback function.
"""
callback_id = uuid.uuid4()
self._callback_statuses[callback_id] = CallbackStatus(callback, kwargs=kwargs)
with self._job_futures.lock and self._callback_futures.lock:
# Create callback dataclass
cid = uuid.uuid4().hex
self._callbacks[cid] = Callback(callback, kwargs=kwargs, callback_id=cid)

# Get futures to wait for before running callback
if self._callback_futures:
futs = self._callback_futures.values()
else:
futs = [fut for _, fut in self._job_futures.copy()]

# Wrap callback function to handle reporting status and catching
# any exceptions and their error messages
def _wrapped_callback():
try:
self._callback_statuses[callback_id].status = JobStatus.RUNNING
callback(self, **kwargs)
self._callback_statuses[callback_id].status = JobStatus.DONE
except Exception as ex: # pylint: disable=broad-except
self._callback_statuses[callback_id].status = JobStatus.ERROR
error_msg = f"Analysis callback {callback} failed: \n" "".join(
traceback.format_exception(type(ex), ex, ex.__traceback__)
)
self._callback_statuses[callback_id].error_msg = error_msg
LOG.warning("Analysis callback %s failed:\n%s", callback, traceback.format_exc())
self._callback_statuses[callback_id].event.set()
# Add run callback future
self._callback_futures[cid] = self._callback_executor.submit(
self._run_callback, cid, futs
)

with self._job_futures.lock:
# Determine if a future is running that we need to add callback to
fut_done = True
if self._job_futures:
_, fut = self._job_futures[-1]
fut_done = fut.done()
if fut_done and self._callback_future is not None:
fut = self._callback_future
fut_done = fut.done()
if fut_done:
fut = None

if fut_done:
# Submit future so analysis can run async even if there are no
# running jobs or running analysis.
self._callback_future = self._executor.submit(_wrapped_callback)
else:
# Wrap the wrapped function for the format expected by Python
# Future.add_done_callback
def _done_callback(fut):
if fut.cancelled():
self._callback_statuses[callback_id].status = JobStatus.CANCELLED
self._callback_statuses[callback_id].event.set()
else:
_wrapped_callback()
def cancel_callbacks(self) -> None:
"""Cancel any queued callbacks.

fut.add_done_callback(_done_callback)
.. note::
A currently running callback cannot be cancelled.
"""
with self._callback_futures.lock:
for cid, fut in self._callback_futures.items():
if fut.done():
continue
if fut.cancel():
LOG.info("Cancelled queued callback [cid: %s].", cid)
self._callbacks[cid].status = JobStatus.CANCELLED
else:
LOG.warning("Unable to cancel running callback [cid: %s].", cid)

def _run_callback(self, callback_id: str, futs: Optional[List[futures.Future]] = None):
"""Run a callback after specified futures have finished."""
if callback_id not in self._callbacks:
raise ValueError(f"No callback with id {callback_id}")

callback = self._callbacks[callback_id]

# Wait for previous futures to finish_
LOG.debug("Waiting to run callback [cid %s]", callback_id)
self._callbacks[callback_id].status = JobStatus.QUEUED
if futs:
futures.wait(futs)

# Run callback function
LOG.debug("Running callback [cid: %s]", callback_id)
self._callbacks[callback_id].status = JobStatus.RUNNING
try:
callback.func(self, **callback.kwargs)
self._callbacks[callback_id].status = JobStatus.DONE
LOG.debug("Callback finished [cid: %s]", callback_id)
except Exception as ex: # pylint: disable=broad-except
self._callbacks[callback_id].status = JobStatus.ERROR
error_msg = f"Analysis callback failed [cid: {callback_id}]:\n" "".join(
traceback.format_exception(type(ex), ex, ex.__traceback__)
)
self._callbacks[callback_id].error_msg = error_msg
LOG.warning(error_msg)

def _add_jobs_data(
self,
Expand Down Expand Up @@ -862,8 +875,8 @@ def save(self) -> None:

if self.verbose:
print(
"You can view the experiment online at https://quantum-computing.ibm.com/experiments/"
+ self.experiment_id
"You can view the experiment online at "
"https://quantum-computing.ibm.com/experiments/" + self.experiment_id
)

@classmethod
Expand Down Expand Up @@ -930,8 +943,11 @@ def block_for_results(self, timeout: Optional[float] = None) -> "DbExperimentDat
Returns:
The experiment data with finished jobs and post-processing.
"""
_, timeout = combined_timeout(self._wait_for_jobs, timeout)
_, timeout = combined_timeout(self._wait_for_callbacks, timeout)
if self._callback_futures:
self._wait_for_callbacks(timeout)
else:
self._wait_for_jobs(timeout)
self._removed_done_futures()
return self

def _wait_for_jobs(self, timeout: Optional[float] = None):
Expand Down Expand Up @@ -960,29 +976,27 @@ def _wait_for_jobs(self, timeout: Optional[float] = None):

def _wait_for_callbacks(self, timeout: Optional[float] = None):
"""Wait for analysis callbacks to finish"""
# Wait for analysis callbacks to finish
if self._callback_statuses:
for status in self._callback_statuses.values():
if status.status in [JobStatus.DONE, JobStatus.CANCELLED]:
continue
LOG.info("Waiting for analysis callback %s to finish.", status.callback)
finished, timeout = combined_timeout(status.event.wait, timeout)
if not finished:
LOG.warning(
"Possibly incomplete analysis results:"
" analysis"
" callback %s timed out.",
status.callback,
)
try:
LOG.debug("Waiting for all callbacks to finish [eid: %s]", self.experiment_id)
waited = futures.wait(self._callback_futures.values(), timeout=timeout)
if waited.not_done:
raise futures.TimeoutError
LOG.debug("All callbacks finished [eid: %s]", self.experiment_id)
except futures.TimeoutError:
LOG.warning("Waiting for callbacks timed out before completion.")
except futures.CancelledError:
LOG.warning("Callbacks were cancelled before completion.")

def _removed_done_futures(self):
"""Remove futures that have finished"""
with self._callback_futures.lock and self._job_futures.lock:
running_callbacks = [
(cid, fut) for cid, fut in self._callback_futures.items() if not fut.done()
]
self._callback_futures = ThreadSafeOrderedDict(running_callbacks)

# Check analysis status and show warning if cancelled or error
callback_status = self._callback_status()
if callback_status == "CANCELLED":
LOG.warning("Possibly incomplete analysis results: an analysis callback was cancelled.")
elif callback_status == "ERROR":
LOG.warning(
"Possibly incomplete analysis results: an analysis callback raised an error."
)
running_jobs = [(jid, fut) for jid, fut in self._job_futures if not fut.done()]
self._job_futures = ThreadSafeList(running_jobs)

def status(self) -> str:
"""Return the data processing status.
Expand All @@ -999,6 +1013,8 @@ def status(self) -> str:
* POST_PROCESSING - if any analysis callbacks are still running
* DONE - if all jobs and analysis callbacks are finished.

If no data has been added the returned status will be EMPTY.

.. note::

If an experiment has status ERROR or CANCELLED there may still
Expand All @@ -1013,22 +1029,23 @@ def status(self) -> str:
for container in [
self._data,
self._jobs,
self._callback_statuses,
self._job_futures,
self._callbacks,
self._callback_futures,
self._figures,
self._analysis_results,
]
):
return "INITIALIZING"
return "EMPTY"

job_status = self._job_status()
if job_status != "DONE":
return job_status

callback_status = self._callback_status()
if callback_status in ["QUEUED", "RUNNING"]:
return "POST_PROCESSING"

return callback_status
if callback_status in ["DONE", "CANCELLED", "ERROR"]:
return callback_status
return "POST_PROCESSING"

def _job_status(self) -> str:
"""Return the experiment job execution status.
Expand Down Expand Up @@ -1084,34 +1101,33 @@ def _callback_status(self) -> str:
If the experiment consists of multiple analysis callbacks, the returned
status is mapped in the following order:

* ERROR - if any analysis callback incurred an error.
* CANCELLED - if any analysis callback is cancelled.
* RUNNING - if any analysis callbacks are still running.
* QUEUED - if any analysis callback is queued.
* DONE - if all analysis callbacks are finished.
* ERROR - if any callback incurred an error.
* CANCELLED - if any callback was cancelled.
* RUNNING - if any callback is still running.
* QUEUED - if any callback is queued.
* INITIALIZING - if any callback is being initialized.
* DONE - if all callbacks are finished.

Returns:
Analysis callback status.
"""
statuses = set()
for status in self._callback_statuses.values():
for status in self._callbacks.values():
statuses.add(status.status)

# Remove analysis future if it is done, since we store all statuses
# In the _callback_status field.
if self._callback_future is not None and self._callback_future.done():
self._callback_future = None

for stat in [
JobStatus.ERROR,
JobStatus.CANCELLED,
JobStatus.RUNNING,
JobStatus.QUEUED,
JobStatus.VALIDATING,
JobStatus.INITIALIZING,
JobStatus.DONE,
]:
if stat in statuses:
return stat.name

return "DONE"
return JobStatus.DONE.name

def errors(self) -> str:
"""Return errors encountered.
Expand All @@ -1120,31 +1136,22 @@ def errors(self) -> str:
Experiment errors.
"""
errors = []
# Get any future errors
for fut_kwargs, fut in self._job_futures:
if fut.done():
ex = fut.exception()
if ex:
jobs = [job.job_id() for job in fut_kwargs["jobs"]]
errors.append(
f"Job {jobs} failed: \n"
+ "".join(traceback.format_exception(type(ex), ex, ex.__traceback__))
)

# Get any job errors
for job in self._jobs.values():
if job and job.status() == JobStatus.ERROR:
job_err = "."
if hasattr(job, "error_message"):
job_err = ": " + job.error_message()
errors.append(f"Job {job.job_id()} failed{job_err}")
error_msg = job.error_message()
else:
error_msg = ""
errors.append(f"\n[jid: {job.job_id()}]: {error_msg}")

# Get any analysis callback errors
for status in self._callback_statuses.values():
if status.error_msg is not None:
errors.append(status.error_msg)
# Get any callback errors
for callback in self._callbacks.values():
if callback.status == JobStatus.ERROR:
errors.append(f"\n[cid: {callback.callback_id}]: {callback.error_msg}")

return "\n".join(errors)
return "".join(errors)

def copy(self, copy_results: bool = True) -> "DbExperimentDataV1":
"""Make a copy of the experiment data with a new experiment ID.
Expand Down
9 changes: 3 additions & 6 deletions qiskit_experiments/framework/composite/composite_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,10 @@ def _run_analysis(self, experiment_data: ExperimentData):
)
analysis_results.append(result)

# Add callback to wait for all component analysis to finish before returning
# Wait for all component analysis to finish before returning
# the parent experiment analysis results
def _wait_for_components(experiment_data, component_ids):
for comp_id in component_ids:
experiment_data.child_data(comp_id).block_for_results()

experiment_data.add_analysis_callback(_wait_for_components, component_ids=component_ids)
for comp_id in component_ids:
experiment_data.child_data(comp_id).block_for_results()

return analysis_results, []

Expand Down
Loading