Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
464192f
Remove version field from runtime program (#152)
rathishcholarajan Oct 18, 2021
abb98d0
Rename isPublic to is_public when creating or reading runtime program…
rathishcholarajan Oct 18, 2021
31fdd4b
Update programId to program_id when running program (#139)
renier Oct 18, 2021
0d5ebf8
Add support to view program update date (#160)
rathishcholarajan Oct 18, 2021
767afeb
Upload runtime program using 'data' field (#157)
rathishcholarajan Oct 19, 2021
24dfa95
Read programs from "programs" array in response (#161)
rathishcholarajan Oct 19, 2021
79cd9bd
Remove version field from runtime program (#152)
rathishcholarajan Oct 18, 2021
45ab0b2
Rename isPublic to is_public when creating or reading runtime program…
rathishcholarajan Oct 18, 2021
faff4e2
Update programId to program_id when running program (#139)
renier Oct 18, 2021
bcc27fb
Add support to view program update date (#160)
rathishcholarajan Oct 18, 2021
866d045
Upload runtime program using 'data' field (#157)
rathishcholarajan Oct 19, 2021
eda2a51
Read programs from "programs" array in response (#161)
rathishcholarajan Oct 19, 2021
ab33b52
Pass program as base64 string to update (#168)
rathishcholarajan Oct 21, 2021
095c620
remove runtime job results poll
kt474 Oct 23, 2021
98a29fe
fix lint
kt474 Oct 23, 2021
68edf13
refactor
kt474 Oct 24, 2021
5807fa9
add reno
kt474 Oct 24, 2021
0fd4bad
Merge branch 'main' into job-results-stream
kt474 Oct 25, 2021
a8fe605
Accept JSON schema as program metadata (#158)
rathishcholarajan Oct 25, 2021
8afba87
update result stream & test case
kt474 Oct 26, 2021
1b26e0b
Merge branch 'runtime-release-q4' of https://github.com/Qiskit-Partne…
kt474 Oct 26, 2021
df4eb81
Pass program params as object (#171)
rathishcholarajan Oct 27, 2021
e3e4315
Fix integration tests
rathishcholarajan Oct 27, 2021
f5712bc
Merge branch 'runtime-release-q4' of https://github.com/Qiskit-Partne…
kt474 Oct 28, 2021
0f95825
Use count to reduce one last extra call to API (#172)
rathishcholarajan Oct 28, 2021
b532b14
Allow updating runtime metadata in place (#188)
jyu00 Oct 29, 2021
f395f81
remove reno, start stream logic
kt474 Nov 1, 2021
da3ae88
Merge branch 'main' into job-results-stream
kt474 Nov 1, 2021
c1065f0
Merge branch 'runtime-release-q4' of https://github.com/Qiskit-Partne…
kt474 Nov 1, 2021
fbcad81
Merge branch 'main' into job-results-stream
rathishcholarajan Nov 2, 2021
f5b4e54
Remove version field from runtime program (#152)
rathishcholarajan Oct 18, 2021
93a2b85
Rename isPublic to is_public when creating or reading runtime program…
rathishcholarajan Oct 18, 2021
966d42a
Update programId to program_id when running program (#139)
renier Oct 18, 2021
031fc25
Add support to view program update date (#160)
rathishcholarajan Oct 18, 2021
1561a9b
Upload runtime program using 'data' field (#157)
rathishcholarajan Oct 19, 2021
b656ece
Read programs from "programs" array in response (#161)
rathishcholarajan Oct 19, 2021
8fb3669
Pass program as base64 string to update (#168)
rathishcholarajan Oct 21, 2021
74472c5
Accept JSON schema as program metadata (#158)
rathishcholarajan Oct 25, 2021
ee76158
Pass program params as object (#171)
rathishcholarajan Oct 27, 2021
da52271
Fix integration tests
rathishcholarajan Oct 27, 2021
f3033b8
Use count to reduce one last extra call to API (#172)
rathishcholarajan Oct 28, 2021
aea236b
Allow updating runtime metadata in place (#188)
jyu00 Oct 29, 2021
e71b402
Merge branch 'runtime-release-q4' of https://github.com/Qiskit-Partne…
kt474 Nov 2, 2021
290d69f
refactor & cleanup
kt474 Nov 2, 2021
b3a54a2
minor refactor
kt474 Nov 2, 2021
ddf9f1d
rename method, remove old
kt474 Nov 4, 2021
4102480
Allow filtering runtime jobs by program ID (#193)
rathishcholarajan Nov 4, 2021
5e7e992
add call to self.status()
kt474 Nov 4, 2021
8e8bcb3
Allow runtime program authors to retrieve program data (#174)
kt474 Nov 4, 2021
07f0d53
Update cache after updating program (#196)
rathishcholarajan Nov 5, 2021
bef1107
Merge branch 'runtime-release-q4' into job-results-stream
rathishcholarajan Nov 5, 2021
8133076
Allow filtering runtime jobs by provider (#197)
kt474 Nov 8, 2021
06f0e1f
Merge branch 'runtime-release-q4' into job-results-stream
kt474 Nov 8, 2021
5f8e8db
refactor timeout
kt474 Nov 8, 2021
aac2d79
fix lint
kt474 Nov 8, 2021
4522d9b
Support pagination for retrieving runtime programs (#170)
kt474 Nov 8, 2021
1fa914e
Merge branch 'runtime-release-q4' into job-results-stream
kt474 Nov 8, 2021
8b59137
Merge branch 'main' into job-results-stream
rathishcholarajan Nov 9, 2021
7c481bd
Merge branch 'main' into job-results-stream
kt474 Nov 12, 2021
0836763
add mock test method
kt474 Nov 17, 2021
78edfa7
update sleep time
kt474 Nov 17, 2021
42dfc76
update sleep times & refactor
kt474 Nov 17, 2021
374a32f
removed unused import
kt474 Nov 17, 2021
883eef2
Merge branch 'main' into job-results-stream
kt474 Nov 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 17 additions & 29 deletions qiskit_ibm/runtime/runtime_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@
"""Qiskit runtime job."""

from typing import Any, Optional, Callable, Dict, Type
import time
import logging
from concurrent import futures
import traceback
import queue
from datetime import datetime

from qiskit.providers.exceptions import JobTimeoutError
from qiskit.providers.backend import Backend
from qiskit.providers.jobstatus import JobStatus, JOB_FINAL_STATES

Expand Down Expand Up @@ -128,14 +126,12 @@ def __init__(
def result(
self,
timeout: Optional[float] = None,
wait: float = 5,
decoder: Optional[Type[ResultDecoder]] = None
) -> Any:
"""Return the results of the job.

Args:
timeout: Number of seconds to wait for job.
wait: Seconds between queries.
decoder: A :class:`ResultDecoder` subclass used to decode job results.

Returns:
Expand All @@ -146,7 +142,7 @@ def result(
"""
_decoder = decoder or self._result_decoder
if self._results is None or (_decoder != self._result_decoder):
self.wait_for_final_state(timeout=timeout, wait=wait)
self.wait_for_final_state(timeout=timeout)
if self._status == JobStatus.ERROR:
raise RuntimeJobFailureError(f"Unable to retrieve job result. "
f"{self.error_message()}")
Expand Down Expand Up @@ -188,30 +184,20 @@ def error_message(self) -> Optional[str]:
self._set_status_and_error_message()
return self._error_message

def wait_for_final_state(
self,
timeout: Optional[float] = None,
wait: float = 5
) -> None:
"""Poll the job status until it progresses to a final state such as ``DONE`` or ``ERROR``.
def wait_for_final_state(self, timeout: Optional[float] = None) -> None:
"""If the websocket server stream endpoint is open, wait for it to close.

Args:
timeout: Seconds to wait for the job. If ``None``, wait indefinitely.
wait: Seconds between queries.

Raises:
JobTimeoutError: If the job does not reach a final state before the
specified timeout.
"""
start_time = time.time()
status = self.status()
while status not in JOB_FINAL_STATES:
elapsed_time = time.time() - start_time
if timeout is not None and elapsed_time >= timeout:
raise JobTimeoutError(
'Timeout while waiting for job {}.'.format(self.job_id))
time.sleep(wait)
status = self.status()
if self._status not in JOB_FINAL_STATES:
self._ws_client_future = self._executor.submit(self._start_websocket_client)
self._ws_client_future.result(timeout)
self.status()

def stream_results(
self,
Expand All @@ -233,16 +219,18 @@ def stream_results(
RuntimeInvalidStateError: If a callback function is already streaming results or
if the job already finished.
"""
if self._is_streaming():
raise RuntimeInvalidStateError("A callback function is already streaming results.")
Comment thread
rathishcholarajan marked this conversation as resolved.

if self._status in JOB_FINAL_STATES:
raise RuntimeInvalidStateError("Job already finished.")

self._ws_client_future = self._executor.submit(self._start_websocket_client)
self._executor.submit(self._stream_results,
result_queue=self._result_queue, user_callback=callback,
decoder=decoder)
if self._is_streaming():
try:
self._executor.submit(self._stream_results, result_queue=self._result_queue,
user_callback=callback, decoder=decoder)
except Exception:
raise RuntimeInvalidStateError("A callback function is already streaming results.")
else:
self._ws_client_future = self._executor.submit(self._start_websocket_client)
self._executor.submit(self._stream_results, result_queue=self._result_queue,
user_callback=callback, decoder=decoder)

def cancel_result_streaming(self) -> None:
"""Cancel result streaming."""
Expand Down
56 changes: 40 additions & 16 deletions test/ibm/runtime/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,11 @@ def test_run_program(self):
self.assertIsInstance(job, RuntimeJob)
self.assertIsInstance(job.status(), JobStatus)
self.assertEqual(job.inputs, params)
job.wait_for_final_state()
with mock.patch.object(RuntimeJob, 'wait_for_final_state',
side_effect=time.sleep(3)):
job.wait_for_final_state()
self.assertTrue(job.result())
self.assertEqual(job.status(), JobStatus.DONE)
self.assertTrue(job.result())

def test_run_program_with_custom_runtime_image(self):
"""Test running program."""
Expand All @@ -437,9 +439,11 @@ def test_run_program_with_custom_runtime_image(self):
self.assertIsInstance(job, RuntimeJob)
self.assertIsInstance(job.status(), JobStatus)
self.assertEqual(job.inputs, params)
job.wait_for_final_state()
with mock.patch.object(RuntimeJob, 'wait_for_final_state',
side_effect=time.sleep(3)):
job.wait_for_final_state()
self.assertTrue(job.result())
self.assertEqual(job.status(), JobStatus.DONE)
self.assertTrue(job.result())
self.assertEqual(job.image, image)

def test_retrieve_program_data(self):
Expand Down Expand Up @@ -479,24 +483,32 @@ def test_program_params_namespace(self):
def test_run_program_failed(self):
"""Test a failed program execution."""
job = self._run_program(job_classes=FailedRuntimeJob)
job.wait_for_final_state()
with mock.patch.object(RuntimeJob, 'wait_for_final_state',
side_effect=time.sleep(5)):
job.wait_for_final_state()
job_result_raw = self.runtime._api_client.job_results(job.job_id)
self.assertEqual(JobStatus.ERROR, job.status())
self.assertEqual(API_TO_JOB_ERROR_MESSAGE['FAILED'].format(
job.job_id, job_result_raw), job.error_message())
with self.assertRaises(RuntimeJobFailureError):
job.result()
with mock.patch.object(RuntimeJob, 'wait_for_final_state',
side_effect=time.sleep(5)):
job.result()

def test_run_program_failed_ran_too_long(self):
"""Test a program that failed since it ran longer than maxiumum execution time."""
job = self._run_program(job_classes=FailedRanTooLongRuntimeJob)
job.wait_for_final_state()
with mock.patch.object(RuntimeJob, 'wait_for_final_state',
side_effect=time.sleep(3)):
job.wait_for_final_state()
job_result_raw = self.runtime._api_client.job_results(job.job_id)
self.assertEqual(JobStatus.ERROR, job.status())
self.assertEqual(API_TO_JOB_ERROR_MESSAGE['CANCELLED - RAN TOO LONG'].format(
job.job_id, job_result_raw), job.error_message())
with self.assertRaises(RuntimeJobFailureError):
job.result()
with mock.patch.object(RuntimeJob, 'wait_for_final_state',
side_effect=time.sleep(3)):
job.result()

def test_retrieve_job(self):
"""Test retrieving a job."""
Expand Down Expand Up @@ -630,8 +642,10 @@ def test_jobs_filter_by_program_id(self):
program_id_1 = self._upload_program()
job = self._run_program(program_id=program_id)
job_1 = self._run_program(program_id=program_id_1)
job.wait_for_final_state()
job_1.wait_for_final_state()
with mock.patch.object(RuntimeJob, 'wait_for_final_state',
side_effect=time.sleep(1)):
job.wait_for_final_state()
job_1.wait_for_final_state()
rjobs = self.runtime.jobs(program_id=program_id)
self.assertEqual(program_id, rjobs[0].program_id)
self.assertEqual(1, len(rjobs))
Expand All @@ -641,7 +655,9 @@ def test_jobs_filter_by_provider(self):
program_id = self._upload_program()
job = self._run_program(program_id=program_id,
hub="defaultHub", group="defaultGroup", project="defaultProject")
job.wait_for_final_state()
with mock.patch.object(RuntimeJob, 'wait_for_final_state',
side_effect=time.sleep(1)):
job.wait_for_final_state()
rjobs = self.runtime.jobs(program_id=program_id,
hub="defaultHub", group="defaultGroup", project="defaultProject")
self.assertEqual(program_id, rjobs[0].program_id)
Expand All @@ -664,7 +680,9 @@ def test_cancel_job(self):
def test_final_result(self):
"""Test getting final result."""
job = self._run_program()
result = job.result()
with mock.patch.object(RuntimeJob, 'wait_for_final_state',
side_effect=time.sleep(5)):
result = job.result()
self.assertTrue(result)

def test_job_status(self):
Expand All @@ -688,7 +706,9 @@ def test_job_program_id(self):
def test_wait_for_final_state(self):
"""Test wait for final state."""
job = self._run_program()
job.wait_for_final_state()
with mock.patch.object(RuntimeJob, 'wait_for_final_state',
side_effect=time.sleep(3)):
job.wait_for_final_state()
self.assertEqual(JobStatus.DONE, job.status())

def test_result_decoder(self):
Expand All @@ -701,7 +721,9 @@ def test_result_decoder(self):
for result_decoder, decoder in sub_tests:
with self.subTest(decoder=decoder):
job = self._run_program(job_classes=job_cls, decoder=result_decoder)
result = job.result(decoder=decoder)
with mock.patch.object(RuntimeJob, 'wait_for_final_state',
side_effect=time.sleep(10)):
result = job.result(decoder=decoder)
self.assertIsInstance(result['serializable_class'], SerializableClass)

def test_get_result_twice(self):
Expand All @@ -711,8 +733,10 @@ def test_get_result_twice(self):
job_cls.custom_result = custom_result

job = self._run_program(job_classes=job_cls)
_ = job.result()
_ = job.result()
with mock.patch.object(RuntimeJob, 'wait_for_final_state',
side_effect=time.sleep(5)):
_ = job.result()
_ = job.result()

def test_program_metadata(self):
"""Test program metadata."""
Expand Down