Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 134 additions & 21 deletions qiskit_experiments/database_service/db_experiment_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@

"""Stored data class."""

import sys
import time
import logging
import dataclasses
import threading
import uuid
from typing import Optional, List, Any, Union, Callable, Dict
from typing import Optional, List, Any, Union, Callable, Dict, Tuple
import copy
from concurrent import futures
from functools import wraps
Expand Down Expand Up @@ -968,6 +970,95 @@ def _wait_for_callbacks(self, timeout: Optional[float] = None):
"Possibly incomplete analysis results: an analysis callback raised an error."
)

def monitor(self, interval=2, output=sys.stdout, line_discipline="\r"):
"""Monitors and outputs the live status of the experiment.

The output is in format "Experiment Status:" followed by one of the possible
* JOB (1/N) `STATUS` if the experiment is in the job phase. For full possible statuses of
each job, see the :meth:`status` method.
* ANALYSIS IN PROGRESS if the experiment is in the analysis phase.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #599 this could be changed to something like Analysis (1/N) STATUS` like for jobs, since you should be able to see how many analysis jobs there are and get the status analysis status.


Args:
interval (int): The interval in seconds at which to update status, default 2.
output (file): What object to write the output to, default stdout.
line_discipline (str): Character printed at the start of a line, default \\r.

Returns:
Live updated status.
"""
short_status = self.status()
statuses, queue_info = self._job_status()

num_jobs = len(statuses)

# print(statuses, queue_info)
# print(queue_info[0]._status)

# if short_status in ["DONE", "CANCELLED", "ERROR"]:
# msg_status =

# if short_status == "RUNNING": # Find job in progress
# for q in range(len(queue_info)):
# if queue_info[q]._status == "RUNNING":
# active_job = q + 1
# msg_status = "RUNNING"

# elif short_status == "QUEUED": # Find job in progress
# active_job = None
# min_queue = float("inf")
# for q in range(len(queue_info)):
# if queue_info[q]._status == "QUEUED":
# if queue_info.position < min_queue:
# active_job = q + 1
# min_queue = queue_info.position
# msg_status = f"QUEUED, position {min_queue}"
msg = ""
q = 1

for job in queue_info:
msg += f"job {q}/{num_jobs} has status {job._status}"
if job._status in ["QUEUED", "PENDING_IN_QUEUE"]:
msg += f", queue position{job.position}\n"
msg += "\n"

prev_msg = msg
msg_len = len(msg)

print("{}{}: {}".format(line_discipline, "Experiment Status", msg), end="", file=output)
while self.status() not in ["DONE", "CANCELLED", "ERROR"]:
time.sleep(interval)

msg = ""
for job in queue_info:
msg += f"job {q}/{num_jobs} has status {job._status}"
if job._status in ["QUEUED", "PENDING_IN_QUEUE"]:
msg += f", queue position{job.position}\n"
msg += "\n"
# if status.name == "QUEUED":
# msg += " (%s)" % job.queue_position()
# if job.queue_position() is None:
# interval = 2
# elif not _interval_set:
# interval = max(job.queue_position(), 2)
# else:
# if not _interval_set:
# interval = 2

# Adjust length of message so there are no artifacts
if len(msg) < msg_len:
msg += " " * (msg_len - len(msg))
elif len(msg) > msg_len:
msg_len = len(msg)

if msg != prev_msg:
print(
"{}{}: {}".format(line_discipline, "Experiment Status", msg),
end="",
file=output,
)
prev_msg = msg
print("", file=output)

def status(self) -> str:
"""Return the data processing status.

Expand Down Expand Up @@ -1004,18 +1095,27 @@ def status(self) -> str:
):
return "INITIALIZING"

job_status = self._job_status()
if job_status != "DONE":
return job_status
job_statuses, _ = self._job_status()

for stat in [
JobStatus.ERROR,
JobStatus.CANCELLED,
JobStatus.RUNNING,
JobStatus.QUEUED,
JobStatus.VALIDATING,
JobStatus.INITIALIZING,
]:
if stat in job_statuses:
return stat.name

callback_status = self._callback_status()
if callback_status in ["QUEUED", "RUNNING"]:
return "POST_PROCESSING"

return callback_status

def _job_status(self) -> str:
"""Return the experiment job execution status.
def _job_status(self) -> Tuple[List, List]:
"""Return execution status of all jobs in the experiment.

If the experiment consists of multiple jobs, the returned status is mapped
in the following order:
Expand All @@ -1029,16 +1129,22 @@ def _job_status(self) -> str:
* DONE - if all jobs are finished.

Returns:
Job execution status.
Job execution status. If queued, also returns queue position and expected
completion time.
"""
# Backend jobs
statuses = set()
statuses = []
queue_info = []
with self._job_futures.lock:
for idx, (kwargs, fut) in enumerate(self._job_futures):
all_jobs_done = True
for job in kwargs["jobs"]:
job_status = job.status()
statuses.add(job_status)
statuses.append(job_status)
# if job_status == JobStatus.QUEUED:
queue_info.append(job.queue_info())
# else:
# queue_info.append(None)
if job_status != JobStatus.DONE:
all_jobs_done = False
if fut.done():
Expand All @@ -1049,18 +1155,25 @@ def _job_status(self) -> str:
self._job_futures[idx] = None
self._job_futures = ThreadSafeList(list(filter(None, self._job_futures)))

for stat in [
JobStatus.ERROR,
JobStatus.CANCELLED,
JobStatus.RUNNING,
JobStatus.QUEUED,
JobStatus.VALIDATING,
JobStatus.INITIALIZING,
]:
if stat in statuses:
return stat.name

return "DONE"
return statuses, queue_info

# for stat in [
# JobStatus.ERROR,
# JobStatus.CANCELLED,
# JobStatus.RUNNING,
# JobStatus.QUEUED,
# JobStatus.VALIDATING,
# JobStatus.INITIALIZING,
# ]:
# if JobStatus.QUEUED in statuses:
# # return info for the job closest to running
# return [
# JobStatus.QUEUED,
# ]
# if stat in statuses:
# return [stat.name]

# return "DONE"

def _callback_status(self) -> str:
"""Return the data analysis callback post-processing status.
Expand Down
3 changes: 3 additions & 0 deletions qiskit_experiments/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ def status(self) -> JobStatus:
if self._result:
return JobStatus.DONE
return JobStatus.RUNNING

def queue_info(self):
return None