diff --git a/qiskit_experiments/database_service/db_experiment_data.py b/qiskit_experiments/database_service/db_experiment_data.py index cc37371ce9..95a6d8b6ab 100644 --- a/qiskit_experiments/database_service/db_experiment_data.py +++ b/qiskit_experiments/database_service/db_experiment_data.py @@ -12,11 +12,13 @@ """Stored data class.""" +import sys +import time import logging import dataclasses import threading import uuid -from typing import Optional, List, Any, Union, Callable, Dict +from typing import Optional, List, Any, Union, Callable, Dict, Tuple import copy from concurrent import futures from functools import wraps @@ -968,6 +970,95 @@ def _wait_for_callbacks(self, timeout: Optional[float] = None): "Possibly incomplete analysis results: an analysis callback raised an error." ) + def monitor(self, interval=2, output=sys.stdout, line_discipline="\r"): + """Monitors and outputs the live status of the experiment. + + The output is in format "Experiment Status:" followed by one of the possible + * JOB (1/N) `STATUS` if the experiment is in the job phase. For full possible statuses of + each job, see the :meth:`status` method. + * ANALYSIS IN PROGRESS if the experiment is in the analysis phase. + + Args: + interval (int): The interval in seconds at which to update status, default 2. + output (file): What object to write the output to, default stdout. + line_discipline (str): Character printed at the start of a line, default \\r. + + Returns: + Live updated status. + """ + short_status = self.status() + statuses, queue_info = self._job_status() + + num_jobs = len(statuses) + + # print(statuses, queue_info) + # print(queue_info[0]._status) + + # if short_status in ["DONE", "CANCELLED", "ERROR"]: + # msg_status = + + # if short_status == "RUNNING": # Find job in progress + # for q in range(len(queue_info)): + # if queue_info[q]._status == "RUNNING": + # active_job = q + 1 + # msg_status = "RUNNING" + + # elif short_status == "QUEUED": # Find job in progress + # active_job = None + # min_queue = float("inf") + # for q in range(len(queue_info)): + # if queue_info[q]._status == "QUEUED": + # if queue_info.position < min_queue: + # active_job = q + 1 + # min_queue = queue_info.position + # msg_status = f"QUEUED, position {min_queue}" + msg = "" + q = 1 + + for job in queue_info: + msg += f"job {q}/{num_jobs} has status {job._status}" + if job._status in ["QUEUED", "PENDING_IN_QUEUE"]: + msg += f", queue position{job.position}\n" + msg += "\n" + + prev_msg = msg + msg_len = len(msg) + + print("{}{}: {}".format(line_discipline, "Experiment Status", msg), end="", file=output) + while self.status() not in ["DONE", "CANCELLED", "ERROR"]: + time.sleep(interval) + + msg = "" + for job in queue_info: + msg += f"job {q}/{num_jobs} has status {job._status}" + if job._status in ["QUEUED", "PENDING_IN_QUEUE"]: + msg += f", queue position{job.position}\n" + msg += "\n" + # if status.name == "QUEUED": + # msg += " (%s)" % job.queue_position() + # if job.queue_position() is None: + # interval = 2 + # elif not _interval_set: + # interval = max(job.queue_position(), 2) + # else: + # if not _interval_set: + # interval = 2 + + # Adjust length of message so there are no artifacts + if len(msg) < msg_len: + msg += " " * (msg_len - len(msg)) + elif len(msg) > msg_len: + msg_len = len(msg) + + if msg != prev_msg: + print( + "{}{}: {}".format(line_discipline, "Experiment Status", msg), + end="", + file=output, + ) + prev_msg = msg + print("", file=output) + def status(self) -> str: """Return the data processing status. @@ -1004,9 +1095,18 @@ def status(self) -> str: ): return "INITIALIZING" - job_status = self._job_status() - if job_status != "DONE": - return job_status + job_statuses, _ = self._job_status() + + for stat in [ + JobStatus.ERROR, + JobStatus.CANCELLED, + JobStatus.RUNNING, + JobStatus.QUEUED, + JobStatus.VALIDATING, + JobStatus.INITIALIZING, + ]: + if stat in job_statuses: + return stat.name callback_status = self._callback_status() if callback_status in ["QUEUED", "RUNNING"]: @@ -1014,8 +1114,8 @@ def status(self) -> str: return callback_status - def _job_status(self) -> str: - """Return the experiment job execution status. + def _job_status(self) -> Tuple[List, List]: + """Return execution status of all jobs in the experiment. If the experiment consists of multiple jobs, the returned status is mapped in the following order: @@ -1029,16 +1129,22 @@ def _job_status(self) -> str: * DONE - if all jobs are finished. Returns: - Job execution status. + Job execution status. If queued, also returns queue position and expected + completion time. """ # Backend jobs - statuses = set() + statuses = [] + queue_info = [] with self._job_futures.lock: for idx, (kwargs, fut) in enumerate(self._job_futures): all_jobs_done = True for job in kwargs["jobs"]: job_status = job.status() - statuses.add(job_status) + statuses.append(job_status) + # if job_status == JobStatus.QUEUED: + queue_info.append(job.queue_info()) + # else: + # queue_info.append(None) if job_status != JobStatus.DONE: all_jobs_done = False if fut.done(): @@ -1049,18 +1155,25 @@ def _job_status(self) -> str: self._job_futures[idx] = None self._job_futures = ThreadSafeList(list(filter(None, self._job_futures))) - for stat in [ - JobStatus.ERROR, - JobStatus.CANCELLED, - JobStatus.RUNNING, - JobStatus.QUEUED, - JobStatus.VALIDATING, - JobStatus.INITIALIZING, - ]: - if stat in statuses: - return stat.name - - return "DONE" + return statuses, queue_info + + # for stat in [ + # JobStatus.ERROR, + # JobStatus.CANCELLED, + # JobStatus.RUNNING, + # JobStatus.QUEUED, + # JobStatus.VALIDATING, + # JobStatus.INITIALIZING, + # ]: + # if JobStatus.QUEUED in statuses: + # # return info for the job closest to running + # return [ + # JobStatus.QUEUED, + # ] + # if stat in statuses: + # return [stat.name] + + # return "DONE" def _callback_status(self) -> str: """Return the data analysis callback post-processing status. diff --git a/qiskit_experiments/test/utils.py b/qiskit_experiments/test/utils.py index a3700083cd..04934d48b8 100644 --- a/qiskit_experiments/test/utils.py +++ b/qiskit_experiments/test/utils.py @@ -55,3 +55,6 @@ def status(self) -> JobStatus: if self._result: return JobStatus.DONE return JobStatus.RUNNING + + def queue_info(self): + return None