Use websocket stream to retrieve runtime job results #175
Use websocket stream to retrieve runtime job results #175kt474 wants to merge 64 commits intoQiskit:mainfrom
Conversation
* Remove version field from runtime program * Add release note
This needs to change in the program upload body request in order to meet the IBM Cloud API guidance. Co-authored-by: Jessie Yu <jessieyu@us.ibm.com>
* Remove version field from runtime program * Add release note
This needs to change in the program upload body request in order to meet the IBM Cloud API guidance. Co-authored-by: Jessie Yu <jessieyu@us.ibm.com>
* Accept JSON schema as program metadata * Update qiskit_ibm/runtime/ibm_runtime_service.py Co-authored-by: Jessie Yu <jessieyu@us.ibm.com> * Apply suggestions from code review * Apply suggestions from code review Co-authored-by: Jessie Yu <jessieyu@us.ibm.com> Co-authored-by: Jessie Yu <jessieyu@us.ibm.com>
* update runtime metadata * return if no data * fix mypy * Update releasenotes/notes/update-runtime-metadata-d2ddbcfc0d034530.yaml Co-authored-by: Rathish Cholarajan <rathishc24@gmail.com>
* retrieve program data * refetch once if no program data * remove unused import * refresh program on data property * fix lint * Update qiskit_ibm/runtime/runtime_program.py Co-authored-by: Rathish Cholarajan <rathishc24@gmail.com> * Update qiskit_ibm/runtime/runtime_program.py Co-authored-by: Rathish Cholarajan <rathishc24@gmail.com> * Update qiskit_ibm/runtime/runtime_program.py Co-authored-by: Rathish Cholarajan <rathishc24@gmail.com> * add test case * add test case * add default data constant * add _validate_program method * Update test/ibm/runtime/test_runtime.py Co-authored-by: Rathish Cholarajan <rathishc24@gmail.com>
rathishcholarajan
left a comment
There was a problem hiding this comment.
The test_runtime.py unit test suite is throwing some websocket errors in most of the tests after this update. (but tests are still passing) See here - https://github.com/Qiskit-Partners/qiskit-ibm/runs/4112663719?check_suite_focus=true#step:5:201
We need to figure out why that's happening. You can compare the logs with runtime-release-q4 PR runs (those are clean).
| if not self._is_streaming(): | ||
| self._ws_client_future = self._executor.submit(self._start_websocket_client) |
There was a problem hiding this comment.
So if the job is already done (i.e. self._ws_client_future.done() is true), we'll still open a connection? Presumably the server will just close it immediate?
Also if I call this method, which times out, then I try to do stream_results() (maybe trying to figure out why the job is taking so long), it's going to tell me a callback function is already streaming results, which is not true.
There was a problem hiding this comment.
I suppose we could do if self._ws_client_future is None we can open a connection and avoid the other case
As for if wait_for_final_state() times out and then stream_results() is called, is there a better way to handle this other than just updating the error message?
There was a problem hiding this comment.
I suppose we could do if
self._ws_client_future is Nonewe can open a connection and avoid the other case
Except we also have the cancel_result_streaming() method which would cancel the future without setting it to None. Instead of relying on the status of the future, we can just check if status is in JOB_FINAL_STATES and only open a connection if it isn't.
As for if wait_for_final_state() times out and then stream_results() is called, is there a better way to handle this other than just updating the error message?
Yeah you can update stream_results() to not raise immediately if there is a stream already open. You can instead start calling the user's callback function with message from the stream.
| if timeout is not None and elapsed_time >= timeout: | ||
| raise JobTimeoutError( | ||
| 'Timeout while waiting for job {}.'.format(self.job_id)) | ||
| time.sleep(wait) |
There was a problem hiding this comment.
Since the reason to switch to ws streaming is to avoid polling, you can just do self._ws_client_future.result(timeout).
There was a problem hiding this comment.
You don't need this sleep() now that you have future.result(), or the while loop.
* add provider param * split provider into hub/group/project * add reno * wip add test case * fix lint/docs * Update qiskit_ibm/runtime/ibm_runtime_service.py Co-authored-by: Rathish Cholarajan <rathishc24@gmail.com> * Update releasenotes/notes/filter-jobs-by-provider-dead04faaf223840.yaml Co-authored-by: Rathish Cholarajan <rathishc24@gmail.com> * refactor test cases * remove print * add integration test Co-authored-by: Rathish Cholarajan <rathishc24@gmail.com>
* wip add limit/offset params * add reno * refactor & update test case * offset -> skip, implement refresh logic * refresh when skip/limit not default * Update releasenotes/notes/runtime-program-pagination-8d599ae984a5ce33.yaml Co-authored-by: Rathish Cholarajan <rathishc24@gmail.com> * Update releasenotes/notes/runtime-program-pagination-8d599ae984a5ce33.yaml Co-authored-by: Rathish Cholarajan <rathishc24@gmail.com> * add to test case * refactor refresh logic * refactor * fix lint * add integration test * update doc string * Update qiskit_ibm/api/clients/runtime.py Co-authored-by: Rathish Cholarajan <rathishc24@gmail.com> * cleanup merge * Apply suggestions from code review * fix lint refactor * Fetch all programs upfront 20 at a time and store in cache For subsequent requests paginate and return from cache * Fix integration test Co-authored-by: Rathish Cholarajan <rathishc24@gmail.com> Co-authored-by: Rathish Cholarajan <Rathish.C@ibm.com>
|
This is failing one integration test test_stream_results_done - which I'm not sure I understand. We're trying to force call stream results on a completed job with interim results and expect the callback function to not be called so the variable |
|
closing, moved to Qiskit/qiskit-ibm-runtime#33 |
Summary
fixes #117
Details and comments