diff --git a/qiskit_ibm_runtime/session.py b/qiskit_ibm_runtime/session.py index c9bdc75f19..d211632adf 100644 --- a/qiskit_ibm_runtime/session.py +++ b/qiskit_ibm_runtime/session.py @@ -15,6 +15,7 @@ from typing import Dict, Optional, Type, Union, Callable, Any from types import TracebackType from functools import wraps +from threading import Lock from qiskit_ibm_provider.utils.converters import hms_to_seconds @@ -115,6 +116,7 @@ def __init__( backend = backend.name self._backend = backend + self._setup_lock = Lock() self._session_id: Optional[str] = None self._active = True self._max_time = ( @@ -154,22 +156,28 @@ def run( options["backend"] = self._backend if not self._session_id: + # Make sure only one thread can send the session starter job. + self._setup_lock.acquire() # TODO: What happens if session max time != first job max time? # Use session max time if this is first job. options["session_time"] = self._max_time - job = self._service.run( - program_id=program_id, - options=options, - inputs=inputs, - session_id=self._session_id, - start_session=self._session_id is None, - callback=callback, - result_decoder=result_decoder, - ) - - if self._session_id is None: - self._session_id = job.job_id() + try: + job = self._service.run( + program_id=program_id, + options=options, + inputs=inputs, + session_id=self._session_id, + start_session=self._session_id is None, + callback=callback, + result_decoder=result_decoder, + ) + + if self._session_id is None: + self._session_id = job.job_id() + finally: + if self._setup_lock.locked(): + self._setup_lock.release() if self._backend is None: self._backend = job.backend().name diff --git a/releasenotes/notes/thread-safe-sessions-d08c8367e98447e7.yaml b/releasenotes/notes/thread-safe-sessions-d08c8367e98447e7.yaml new file mode 100644 index 0000000000..4880e2d535 --- /dev/null +++ b/releasenotes/notes/thread-safe-sessions-d08c8367e98447e7.yaml @@ -0,0 +1,6 @@ +--- +prelude: > + Sessions are now thread-safe and allow for multiple concurrent interactive + experiments. +features: + - Sessions are now thread-safe. diff --git a/test/unit/test_session.py b/test/unit/test_session.py index f796f4b40c..ee554909f8 100644 --- a/test/unit/test_session.py +++ b/test/unit/test_session.py @@ -12,7 +12,11 @@ """Tests for Session classession.""" -from unittest.mock import MagicMock, patch +import sys +import time +from concurrent.futures import ThreadPoolExecutor, wait + +from unittest.mock import MagicMock, Mock, patch from qiskit_ibm_runtime import Session from qiskit_ibm_runtime.ibm_backend import IBMBackend @@ -113,6 +117,29 @@ def test_run(self): self.assertEqual(session.session_id, job.job_id()) self.assertEqual(session.backend(), backend) + def test_run_is_thread_safe(self): + """Test the session sends a session starter job once, and only once.""" + service = MagicMock() + api = MagicMock() + service._api_client = api + + def _wait_a_bit(*args, **kwargs): + # pylint: disable=unused-argument + switchinterval = sys.getswitchinterval() + time.sleep(switchinterval * 2) + return MagicMock() + + service.run = Mock(side_effect=_wait_a_bit) + + session = Session(service=service, backend="ibm_gotham") + with ThreadPoolExecutor(max_workers=2) as executor: + results = list(map(lambda _: executor.submit(session.run, "", {}), range(5))) + wait(results) + + calls = service.run.call_args_list + session_starters = list(filter(lambda c: c.kwargs["start_session"] is True, calls)) + self.assertEqual(len(session_starters), 1) + def test_close_without_run(self): """Test closing without run.""" service = MagicMock()