diff --git a/.github/workflows/cron-prod.yml b/.github/workflows/cron-prod.yml index 37f6108867..f1652c5861 100644 --- a/.github/workflows/cron-prod.yml +++ b/.github/workflows/cron-prod.yml @@ -75,3 +75,33 @@ jobs: pip install -U -c constraints.txt -r requirements-dev.txt - name: Run Tests run: make test2 + test3: + name: tests3-python${{ matrix.python-version }}-${{ matrix.os }} + runs-on: ${{ matrix.os }} + strategy: + matrix: + python-version: [3.7, 3.8, 3.9] + os: ["windows-latest", "ubuntu-latest"] + env: + QISKIT_IBM_API_TOKEN: ${{ secrets.QISKIT_IBM_API_TOKEN }} + QISKIT_IBM_API_URL: ${{ secrets.QISKIT_IBM_API_URL }} + QISKIT_IBM_HGP: ${{ secrets.QISKIT_IBM_HGP }} + QISKIT_IBM_CLOUD_TOKEN: ${{ secrets.QISKIT_IBM_CLOUD_TOKEN }} + QISKIT_IBM_CLOUD_URL: ${{ secrets.QISKIT_IBM_CLOUD_URL }} + QISKIT_IBM_CLOUD_CRN: ${{ secrets.QISKIT_IBM_CLOUD_CRN }} + LOG_LEVEL: DEBUG + STREAM_LOG: True + QISKIT_IN_PARALLEL: True + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install Deps + run: | + python -m pip install --upgrade pip + pip install -c constraints.txt -e . + pip install -U -c constraints.txt -r requirements-dev.txt + - name: Run Tests + run: make test3 diff --git a/.github/workflows/cron-staging.yml b/.github/workflows/cron-staging.yml index dd5fafba3e..460f6136f6 100644 --- a/.github/workflows/cron-staging.yml +++ b/.github/workflows/cron-staging.yml @@ -77,3 +77,34 @@ jobs: pip install -U -c constraints.txt -r requirements-dev.txt - name: Run Tests run: make test2 + test3: + name: tests3-python${{ matrix.python-version }}-${{ matrix.os }} + runs-on: ${{ matrix.os }} + strategy: + matrix: + python-version: [3.7, 3.8, 3.9] + os: ["windows-latest", "ubuntu-latest"] + env: + QISKIT_IBM_STAGING_API_TOKEN: ${{ secrets.QISKIT_IBM_STAGING_API_TOKEN }} + QISKIT_IBM_STAGING_API_URL: ${{ secrets.QISKIT_IBM_STAGING_API_URL }} + QISKIT_IBM_STAGING_HGP: ${{ secrets.QISKIT_IBM_STAGING_HGP }} + QISKIT_IBM_STAGING_CLOUD_TOKEN: ${{ secrets.QISKIT_IBM_STAGING_CLOUD_TOKEN }} + QISKIT_IBM_STAGING_CLOUD_URL: ${{ secrets.QISKIT_IBM_STAGING_CLOUD_URL }} + QISKIT_IBM_STAGING_CLOUD_CRN: ${{ secrets.QISKIT_IBM_STAGING_CLOUD_CRN }} + QISKIT_IBM_USE_STAGING_CREDENTIALS: True + LOG_LEVEL: DEBUG + STREAM_LOG: True + QISKIT_IN_PARALLEL: True + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install Deps + run: | + python -m pip install --upgrade pip + pip install -c constraints.txt -e . + pip install -U -c constraints.txt -r requirements-dev.txt + - name: Run Tests + run: make test3 diff --git a/Makefile b/Makefile index 93f05b7139..46ab98b362 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ # that they have been altered from the originals. -.PHONY: lint style test mypy test1 test2 test3 runtime_integration +.PHONY: lint style test mypy test1 test2 test3 lint: pylint -rn qiskit_ibm_runtime test @@ -32,8 +32,8 @@ test1: test2: python -m unittest -v test/test_integration_job.py -runtime_integration: - python -m unittest -v test/ibm/runtime/test_runtime_integration.py +test3: + python -m unittest -v test/test_integration_retrieve_job.py test/test_integration_interim_results.py black: black qiskit_ibm_runtime setup.py test \ No newline at end of file diff --git a/test/ibm_test_case.py b/test/ibm_test_case.py index 399a0ba537..682fd7526f 100644 --- a/test/ibm_test_case.py +++ b/test/ibm_test_case.py @@ -13,16 +13,22 @@ """Custom TestCase for IBM Provider.""" import os +import copy import logging import inspect -from unittest import TestCase +import unittest +from contextlib import suppress +from collections import defaultdict from qiskit_ibm_runtime import QISKIT_IBM_RUNTIME_LOGGER_NAME +from qiskit_ibm_runtime.exceptions import IBMNotAuthorizedError from .utils.utils import setup_test_logging +from .utils.decorators import requires_cloud_legacy_services +from .utils.templates import RUNTIME_PROGRAM, RUNTIME_PROGRAM_METADATA, PROGRAM_PREFIX -class IBMTestCase(TestCase): +class IBMTestCase(unittest.TestCase): """Custom TestCase for use with the Qiskit IBM Runtime.""" @classmethod @@ -55,3 +61,137 @@ def _set_logging_level(cls, logger: logging.Logger) -> None: ): logger.addHandler(logging.StreamHandler()) logger.propagate = False + + +class IBMIntegrationTestCase(IBMTestCase): + """Custom integration test case for use with the Qiskit IBM Runtime.""" + + @classmethod + @requires_cloud_legacy_services + def setUpClass(cls, services): + """Initial class level setup.""" + # pylint: disable=arguments-differ + super().setUpClass() + cls.services = services + + def setUp(self) -> None: + """Test level setup.""" + super().setUp() + self.to_delete = defaultdict(list) + self.to_cancel = defaultdict(list) + + def tearDown(self) -> None: + """Test level teardown.""" + super().tearDown() + # Delete programs + for service in self.services: + for prog in self.to_delete[service.auth]: + with suppress(Exception): + service.delete_program(prog) + + # Cancel and delete jobs. + for service in self.services: + for job in self.to_cancel[service.auth]: + with suppress(Exception): + job.cancel() + with suppress(Exception): + service.delete_job(job.job_id) + + def _upload_program( + self, + service, + name=None, + max_execution_time=300, + data=None, + is_public: bool = False, + ): + """Upload a new program.""" + name = name or PROGRAM_PREFIX + data = data or RUNTIME_PROGRAM + metadata = copy.deepcopy(RUNTIME_PROGRAM_METADATA) + metadata["name"] = name + metadata["max_execution_time"] = max_execution_time + metadata["is_public"] = is_public + program_id = service.upload_program(data=data, metadata=metadata) + self.to_delete[service.auth].append(program_id) + return program_id + + +class IBMIntegrationJobTestCase(IBMIntegrationTestCase): + """Custom integration test case for job-related tests.""" + + @classmethod + def setUpClass(cls): + """Initial class level setup.""" + # pylint: disable=arguments-differ + # pylint: disable=no-value-for-parameter + super().setUpClass() + cls._create_default_program() + cls._find_sim_backends() + + @classmethod + def tearDownClass(cls) -> None: + """Class level teardown.""" + super().tearDownClass() + # Delete default program. + with suppress(Exception): + for service in cls.services: + service.delete_program(cls.program_ids[service.auth]) + cls.log.debug( + "Deleted %s program %s", service.auth, cls.program_ids[service.auth] + ) + + @classmethod + def _create_default_program(cls): + """Create a default program.""" + metadata = copy.deepcopy(RUNTIME_PROGRAM_METADATA) + metadata["name"] = PROGRAM_PREFIX + cls.program_ids = {} + cls.sim_backends = {} + for service in cls.services: + try: + prog_id = service.upload_program( + data=RUNTIME_PROGRAM, metadata=metadata + ) + cls.log.debug("Uploaded %s program %s", service.auth, prog_id) + cls.program_ids[service.auth] = prog_id + except IBMNotAuthorizedError: + raise unittest.SkipTest("No upload access.") + + @classmethod + def _find_sim_backends(cls): + """Find a simulator backend for each service.""" + for service in cls.services: + cls.sim_backends[service.auth] = service.backends(simulator=True)[0].name() + + def _run_program( + self, + service, + program_id=None, + iterations=1, + inputs=None, + interim_results=None, + final_result=None, + callback=None, + backend=None, + ): + """Run a program.""" + self.log.debug("Running program on %s", service.auth) + inputs = ( + inputs + if inputs is not None + else { + "iterations": iterations, + "interim_results": interim_results or {}, + "final_result": final_result or {}, + } + ) + pid = program_id or self.program_ids[service.auth] + backend_name = backend or self.sim_backends[service.auth] + options = {"backend_name": backend_name} + job = service.run( + program_id=pid, inputs=inputs, options=options, callback=callback + ) + self.log.info("Runtime job %s submitted.", job.job_id) + self.to_cancel[service.auth].append(job) + return job diff --git a/test/test_integration_backend.py b/test/test_integration_backend.py index 4ae24451d1..ba138ab2a9 100644 --- a/test/test_integration_backend.py +++ b/test/test_integration_backend.py @@ -14,25 +14,16 @@ from unittest import SkipTest -from .ibm_test_case import IBMTestCase +from .ibm_test_case import IBMIntegrationTestCase from .utils.decorators import ( - requires_cloud_legacy_services, run_cloud_legacy_real, requires_cloud_legacy_devices, ) -class TestIntegrationBackend(IBMTestCase): +class TestIntegrationBackend(IBMIntegrationTestCase): """Integration tests for backend functions.""" - @classmethod - @requires_cloud_legacy_services - def setUpClass(cls, services): - """Initial class level setup.""" - # pylint: disable=arguments-differ - super().setUpClass() - cls.services = services - @run_cloud_legacy_real def test_backends(self, service): """Test getting all backends.""" @@ -53,7 +44,7 @@ def test_get_backend(self, service): self.assertTrue(backend) -class TestIBMBackend(IBMTestCase): +class TestIBMBackend(IBMIntegrationTestCase): """Test ibm_backend module.""" @classmethod @@ -61,6 +52,7 @@ class TestIBMBackend(IBMTestCase): def setUpClass(cls, devices): """Initial class level setup.""" # pylint: disable=arguments-differ + # pylint: disable=no-value-for-parameter super().setUpClass() cls.devices = devices diff --git a/test/test_integration_interim_results.py b/test/test_integration_interim_results.py new file mode 100644 index 0000000000..a904229e20 --- /dev/null +++ b/test/test_integration_interim_results.py @@ -0,0 +1,200 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Tests for job functions using real runtime service.""" + +import time + +from qiskit.providers.jobstatus import JobStatus + +from .ibm_test_case import IBMIntegrationJobTestCase +from .utils.decorators import run_cloud_legacy_real +from .utils.utils import cancel_job_safe, wait_for_status +from .mock.proxy_server import MockProxyServer, use_proxies + + +class TestIntegrationInterimResults(IBMIntegrationJobTestCase): + """Integration tests for interim result functions.""" + + @run_cloud_legacy_real + def test_interim_result_callback(self, service): + """Test interim result callback.""" + + def result_callback(job_id, interim_result): + nonlocal final_it + final_it = interim_result["iteration"] + nonlocal callback_err + if job_id != job.job_id: + callback_err.append(f"Unexpected job ID: {job_id}") + if interim_result["interim_results"] != int_res: + callback_err.append(f"Unexpected interim result: {interim_result}") + + int_res = "foo" + final_it = 0 + callback_err = [] + iterations = 3 + job = self._run_program( + service, + iterations=iterations, + interim_results=int_res, + callback=result_callback, + ) + job.wait_for_final_state() + self.assertEqual(iterations - 1, final_it) + self.assertFalse(callback_err) + self.assertIsNotNone(job._ws_client._server_close_code) + + @run_cloud_legacy_real + def test_stream_results(self, service): + """Test stream_results method.""" + + def result_callback(job_id, interim_result): + nonlocal final_it + final_it = interim_result["iteration"] + nonlocal callback_err + if job_id != job.job_id: + callback_err.append(f"Unexpected job ID: {job_id}") + if interim_result["interim_results"] != int_res: + callback_err.append(f"Unexpected interim result: {interim_result}") + + int_res = "bar" + final_it = 0 + callback_err = [] + iterations = 3 + job = self._run_program(service, iterations=iterations, interim_results=int_res) + job.stream_results(result_callback) + job.wait_for_final_state() + self.assertEqual(iterations - 1, final_it) + self.assertFalse(callback_err) + self.assertIsNotNone(job._ws_client._server_close_code) + + @run_cloud_legacy_real + def test_stream_results_done(self, service): + """Test streaming interim results after job is done.""" + + def result_callback(job_id, interim_result): + # pylint: disable=unused-argument + nonlocal called_back + called_back = True + + called_back = False + job = self._run_program(service, interim_results="foobar") + job.wait_for_final_state() + job._status = JobStatus.RUNNING # Allow stream_results() + job.stream_results(result_callback) + time.sleep(2) + self.assertFalse(called_back) + self.assertIsNotNone(job._ws_client._server_close_code) + + @run_cloud_legacy_real + def test_retrieve_interim_results(self, service): + """Test retrieving interim results with API endpoint""" + int_res = "foo" + job = self._run_program(service, interim_results=int_res) + job.wait_for_final_state() + interim_results = job.interim_results() + self.assertIn(int_res, interim_results[0]) + + @run_cloud_legacy_real + def test_callback_error(self, service): + """Test error in callback method.""" + + def result_callback(job_id, interim_result): + # pylint: disable=unused-argument + if interim_result["iteration"] == 0: + raise ValueError("Kaboom!") + nonlocal final_it + final_it = interim_result["iteration"] + + final_it = 0 + iterations = 3 + with self.assertLogs("qiskit_ibm_runtime", level="WARNING") as err_cm: + job = self._run_program( + service, + iterations=iterations, + interim_results="foo", + callback=result_callback, + ) + job.wait_for_final_state() + + self.assertIn("Kaboom", ", ".join(err_cm.output)) + self.assertEqual(iterations - 1, final_it) + self.assertIsNotNone(job._ws_client._server_close_code) + + @run_cloud_legacy_real + def test_callback_cancel_job(self, service): + """Test canceling a running job while streaming results.""" + + def result_callback(job_id, interim_result): + # pylint: disable=unused-argument + nonlocal final_it + final_it = interim_result["iteration"] + + final_it = 0 + iterations = 5 + sub_tests = [JobStatus.QUEUED, JobStatus.RUNNING] + + for status in sub_tests: + with self.subTest(status=status): + if status == JobStatus.QUEUED: + _ = self._run_program(service, iterations=10) + + job = self._run_program( + service=service, + iterations=iterations, + interim_results="foo", + callback=result_callback, + ) + wait_for_status(job, status) + if not cancel_job_safe(job, self.log): + return + time.sleep(3) # Wait for cleanup + self.assertIsNotNone(job._ws_client._server_close_code) + self.assertLess(final_it, iterations) + + @run_cloud_legacy_real + def test_websocket_proxy(self, service): + """Test connecting to websocket via proxy.""" + + def result_callback(job_id, interim_result): # pylint: disable=unused-argument + nonlocal callback_called + callback_called = True + + MockProxyServer(self, self.log).start() + callback_called = False + + with use_proxies(service, MockProxyServer.VALID_PROXIES): + job = self._run_program(service, iterations=1, callback=result_callback) + job.wait_for_final_state() + + self.assertTrue(callback_called) + + @run_cloud_legacy_real + def test_websocket_proxy_invalid_port(self, service): + """Test connecting to websocket via invalid proxy port.""" + + def result_callback(job_id, interim_result): # pylint: disable=unused-argument + nonlocal callback_called + callback_called = True + + callback_called = False + invalid_proxy = { + "https": "http://{}:{}".format( + MockProxyServer.PROXY_IP_ADDRESS, MockProxyServer.INVALID_PROXY_PORT + ) + } + # TODO - verify WebsocketError in output log. For some reason self.assertLogs + # doesn't always work even when the error is clearly logged. + with use_proxies(service, invalid_proxy): + job = self._run_program(service, iterations=2, callback=result_callback) + job.wait_for_final_state() + self.assertFalse(callback_called) diff --git a/test/test_integration_job.py b/test/test_integration_job.py index add4542148..12606d58cd 100644 --- a/test/test_integration_job.py +++ b/test/test_integration_job.py @@ -12,103 +12,32 @@ """Tests for job functions using real runtime service.""" -import copy -import unittest -import uuid import time import random -from contextlib import suppress -from collections import defaultdict -from qiskit.providers.jobstatus import JobStatus, JOB_FINAL_STATES -from qiskit.providers.exceptions import QiskitBackendNotFoundError +from qiskit.providers.jobstatus import JobStatus from qiskit.test.decorators import slow_test from qiskit_ibm_runtime.constants import API_TO_JOB_ERROR_MESSAGE -from qiskit_ibm_runtime.exceptions import IBMNotAuthorizedError from qiskit_ibm_runtime.exceptions import ( - RuntimeDuplicateProgramError, RuntimeJobFailureError, RuntimeInvalidStateError, RuntimeJobNotFound, ) -from .ibm_test_case import IBMTestCase -from .utils.decorators import requires_cloud_legacy_services, run_cloud_legacy_real -from .utils.templates import RUNTIME_PROGRAM, RUNTIME_PROGRAM_METADATA, PROGRAM_PREFIX +from .ibm_test_case import IBMIntegrationJobTestCase +from .utils.decorators import run_cloud_legacy_real from .utils.serialization import ( get_complex_types, SerializableClassDecoder, SerializableClass, ) -from .utils.utils import cancel_job_safe -from .mock.proxy_server import MockProxyServer, use_proxies +from .utils.utils import cancel_job_safe, wait_for_status, get_real_device -class TestIntegrationJob(IBMTestCase): +class TestIntegrationJob(IBMIntegrationJobTestCase): """Integration tests for job functions.""" - @classmethod - @requires_cloud_legacy_services - def setUpClass(cls, services): - """Initial class level setup.""" - # pylint: disable=arguments-differ - super().setUpClass() - cls.services = services - metadata = copy.deepcopy(RUNTIME_PROGRAM_METADATA) - metadata["name"] = cls._get_program_name() - cls.program_ids = {} - cls.sim_backends = {} - cls.real_backends = {} - for service in services: - try: - prog_id = service.upload_program( - data=RUNTIME_PROGRAM, metadata=metadata - ) - cls.log.debug("Uploaded %s program %s", service.auth, prog_id) - cls.program_ids[service.auth] = prog_id - except RuntimeDuplicateProgramError: - pass - except IBMNotAuthorizedError: - raise unittest.SkipTest("No upload access.") - - cls.sim_backends[service.auth] = service.backends(simulator=True)[0].name() - - @classmethod - def tearDownClass(cls) -> None: - """Class level teardown.""" - super().tearDownClass() - with suppress(Exception): - for service in cls.services: - service.delete_program(cls.program_ids[service.auth]) - cls.log.debug( - "Deleted %s program %s", service.auth, cls.program_ids[service.auth] - ) - - def setUp(self) -> None: - """Test level setup.""" - super().setUp() - self.poll_time = 1 - self.to_delete = defaultdict(list) - self.to_cancel = defaultdict(list) - - def tearDown(self) -> None: - """Test level teardown.""" - super().tearDown() - # Delete programs - for service in self.services: - for prog in self.to_delete[service.auth]: - with suppress(Exception): - service.delete_program(prog) - - # Cancel and delete jobs. - for service in self.services: - for job in self.to_cancel[service.auth]: - with suppress(Exception): - job.cancel() - with suppress(Exception): - service.delete_job(job.job_id) - @run_cloud_legacy_real def test_run_program(self, service): """Test running a program.""" @@ -121,7 +50,7 @@ def test_run_program(self, service): @run_cloud_legacy_real def test_run_program_real_device(self, service): """Test running a program.""" - device = self._get_real_device(service) + device = get_real_device(service) job = self._run_program(service, final_result="foo", backend=device) result = job.result() self.assertEqual(JobStatus.DONE, job.status()) @@ -164,127 +93,13 @@ def test_run_program_failed_ran_too_long(self, service): with self.assertRaises(RuntimeJobFailureError): job.result() - @run_cloud_legacy_real - def test_retrieve_job_queued(self, service): - """Test retrieving a queued job.""" - real_device = self._get_real_device(service) - _ = self._run_program(service, iterations=10, backend=real_device) - job = self._run_program(service, iterations=2, backend=real_device) - self._wait_for_status(job, JobStatus.QUEUED) - rjob = service.job(job.job_id) - self.assertEqual(job.job_id, rjob.job_id) - self.assertEqual(self.program_ids[service.auth], rjob.program_id) - - @run_cloud_legacy_real - def test_retrieve_job_running(self, service): - """Test retrieving a running job.""" - job = self._run_program(service, iterations=10) - self._wait_for_status(job, JobStatus.RUNNING) - rjob = service.job(job.job_id) - self.assertEqual(job.job_id, rjob.job_id) - self.assertEqual(self.program_ids[service.auth], rjob.program_id) - - @run_cloud_legacy_real - def test_retrieve_job_done(self, service): - """Test retrieving a finished job.""" - job = self._run_program(service) - job.wait_for_final_state() - rjob = service.job(job.job_id) - self.assertEqual(job.job_id, rjob.job_id) - self.assertEqual(self.program_ids[service.auth], rjob.program_id) - - @run_cloud_legacy_real - def test_retrieve_all_jobs(self, service): - """Test retrieving all jobs.""" - job = self._run_program(service) - rjobs = service.jobs() - found = False - for rjob in rjobs: - if rjob.job_id == job.job_id: - self.assertEqual(job.program_id, rjob.program_id) - self.assertEqual(job.inputs, rjob.inputs) - found = True - break - self.assertTrue(found, f"Job {job.job_id} not returned.") - - @run_cloud_legacy_real - def test_retrieve_jobs_limit(self, service): - """Test retrieving jobs with limit.""" - jobs = [] - for _ in range(3): - jobs.append(self._run_program(service)) - - rjobs = service.jobs(limit=2, program_id=self.program_ids[service.auth]) - self.assertEqual(len(rjobs), 2, f"Retrieved jobs: {[j.job_id for j in rjobs]}") - job_ids = {job.job_id for job in jobs} - rjob_ids = {rjob.job_id for rjob in rjobs} - self.assertTrue( - rjob_ids.issubset(job_ids), f"Submitted: {job_ids}, Retrieved: {rjob_ids}" - ) - - @run_cloud_legacy_real - def test_retrieve_pending_jobs(self, service): - """Test retrieving pending jobs (QUEUED, RUNNING).""" - job = self._run_program(service, iterations=10) - self._wait_for_status(job, JobStatus.RUNNING) - rjobs = service.jobs(pending=True) - found = False - for rjob in rjobs: - if rjob.job_id == job.job_id: - self.assertEqual(job.program_id, rjob.program_id) - self.assertEqual(job.inputs, rjob.inputs) - found = True - break - self.assertTrue(found, f"Pending job {job.job_id} not retrieved.") - - @run_cloud_legacy_real - def test_retrieve_returned_jobs(self, service): - """Test retrieving returned jobs (COMPLETED, FAILED, CANCELLED).""" - job = self._run_program(service) - job.wait_for_final_state() - rjobs = service.jobs(pending=False) - found = False - for rjob in rjobs: - if rjob.job_id == job.job_id: - self.assertEqual(job.program_id, rjob.program_id) - self.assertEqual(job.inputs, rjob.inputs) - found = True - break - self.assertTrue(found, f"Returned job {job.job_id} not retrieved.") - - @run_cloud_legacy_real - def test_retrieve_jobs_by_program_id(self, service): - """Test retrieving jobs by Program ID.""" - program_id = self._upload_program(service) - job = self._run_program(service, program_id=program_id) - job.wait_for_final_state() - rjobs = service.jobs(program_id=program_id) - self.assertEqual(program_id, rjobs[0].program_id) - self.assertEqual(1, len(rjobs), f"Retrieved jobs: {[j.job_id for j in rjobs]}") - - def test_jobs_filter_by_hgp(self): - """Test retrieving jobs by hgp.""" - service = [serv for serv in self.services if serv.auth == "legacy"][0] - default_hgp = list(service._hgps.keys())[0] - program_id = self._upload_program(service) - job = self._run_program(service, program_id=program_id) - job.wait_for_final_state() - rjobs = service.jobs(program_id=program_id, instance=default_hgp) - self.assertEqual(program_id, rjobs[0].program_id) - self.assertEqual(1, len(rjobs), f"Retrieved jobs: {[j.job_id for j in rjobs]}") - - uuid_ = uuid.uuid4().hex - fake_hgp = f"{uuid_}/{uuid_}/{uuid_}" - rjobs = service.jobs(program_id=program_id, instance=fake_hgp) - self.assertFalse(rjobs) - @run_cloud_legacy_real def test_cancel_job_queued(self, service): """Test canceling a queued job.""" - real_device = self._get_real_device(service) + real_device = get_real_device(service) _ = self._run_program(service, iterations=10, backend=real_device) job = self._run_program(service, iterations=2, backend=real_device) - self._wait_for_status(job, JobStatus.QUEUED) + wait_for_status(job, JobStatus.QUEUED) if not cancel_job_safe(job, self.log): return time.sleep(10) # Wait a bit for DB to update. @@ -295,7 +110,7 @@ def test_cancel_job_queued(self, service): def test_cancel_job_running(self, service): """Test canceling a running job.""" job = self._run_program(service, iterations=3) - self._wait_for_status(job, JobStatus.RUNNING) + wait_for_status(job, JobStatus.RUNNING) if not cancel_job_safe(job, self.log): return time.sleep(10) # Wait a bit for DB to update. @@ -317,7 +132,7 @@ def test_delete_job(self, service): for status in sub_tests: with self.subTest(status=status): job = self._run_program(service, iterations=2) - self._wait_for_status(job, status) + wait_for_status(job, status) service.delete_job(job.job_id) with self.assertRaises(RuntimeJobNotFound): service.job(job.job_id) @@ -325,150 +140,14 @@ def test_delete_job(self, service): @run_cloud_legacy_real def test_delete_job_queued(self, service): """Test deleting a queued job.""" - real_device = self._get_real_device(service) + real_device = get_real_device(service) _ = self._run_program(service, iterations=10, backend=real_device) job = self._run_program(service, iterations=2, backend=real_device) - self._wait_for_status(job, JobStatus.QUEUED) + wait_for_status(job, JobStatus.QUEUED) service.delete_job(job.job_id) with self.assertRaises(RuntimeJobNotFound): service.job(job.job_id) - @run_cloud_legacy_real - def test_interim_result_callback(self, service): - """Test interim result callback.""" - - def result_callback(job_id, interim_result): - nonlocal final_it - final_it = interim_result["iteration"] - nonlocal callback_err - if job_id != job.job_id: - callback_err.append(f"Unexpected job ID: {job_id}") - if interim_result["interim_results"] != int_res: - callback_err.append(f"Unexpected interim result: {interim_result}") - - int_res = "foo" - final_it = 0 - callback_err = [] - iterations = 3 - job = self._run_program( - service, - iterations=iterations, - interim_results=int_res, - callback=result_callback, - ) - job.wait_for_final_state() - self.assertEqual(iterations - 1, final_it) - self.assertFalse(callback_err) - self.assertIsNotNone(job._ws_client._server_close_code) - - @run_cloud_legacy_real - def test_stream_results(self, service): - """Test stream_results method.""" - - def result_callback(job_id, interim_result): - nonlocal final_it - final_it = interim_result["iteration"] - nonlocal callback_err - if job_id != job.job_id: - callback_err.append(f"Unexpected job ID: {job_id}") - if interim_result["interim_results"] != int_res: - callback_err.append(f"Unexpected interim result: {interim_result}") - - int_res = "bar" - final_it = 0 - callback_err = [] - iterations = 3 - job = self._run_program(service, iterations=iterations, interim_results=int_res) - job.stream_results(result_callback) - job.wait_for_final_state() - self.assertEqual(iterations - 1, final_it) - self.assertFalse(callback_err) - self.assertIsNotNone(job._ws_client._server_close_code) - - @run_cloud_legacy_real - def test_stream_results_done(self, service): - """Test streaming interim results after job is done.""" - - def result_callback(job_id, interim_result): - # pylint: disable=unused-argument - nonlocal called_back - called_back = True - - called_back = False - job = self._run_program(service, interim_results="foobar") - job.wait_for_final_state() - job._status = JobStatus.RUNNING # Allow stream_results() - job.stream_results(result_callback) - time.sleep(2) - self.assertFalse(called_back) - self.assertIsNotNone(job._ws_client._server_close_code) - - @run_cloud_legacy_real - def test_retrieve_interim_results(self, service): - """Test retrieving interim results with API endpoint""" - int_res = "foo" - job = self._run_program(service, interim_results=int_res) - job.wait_for_final_state() - interim_results = job.interim_results() - self.assertIn(int_res, interim_results[0]) - - @run_cloud_legacy_real - def test_callback_error(self, service): - """Test error in callback method.""" - - def result_callback(job_id, interim_result): - # pylint: disable=unused-argument - if interim_result["iteration"] == 0: - raise ValueError("Kaboom!") - nonlocal final_it - final_it = interim_result["iteration"] - - final_it = 0 - iterations = 3 - with self.assertLogs("qiskit_ibm_runtime", level="WARNING") as err_cm: - job = self._run_program( - service, - iterations=iterations, - interim_results="foo", - callback=result_callback, - ) - job.wait_for_final_state() - - self.assertIn("Kaboom", ", ".join(err_cm.output)) - self.assertEqual(iterations - 1, final_it) - self.assertIsNotNone(job._ws_client._server_close_code) - - @run_cloud_legacy_real - def test_callback_cancel_job(self, service): - """Test canceling a running job while streaming results.""" - - def result_callback(job_id, interim_result): - # pylint: disable=unused-argument - nonlocal final_it - final_it = interim_result["iteration"] - - final_it = 0 - iterations = 5 - sub_tests = [JobStatus.QUEUED, JobStatus.RUNNING] - - for status in sub_tests: - with self.subTest(status=status): - if status == JobStatus.QUEUED: - _ = self._run_program(service, iterations=10) - - job = self._run_program( - service=service, - iterations=iterations, - interim_results="foo", - callback=result_callback, - ) - self._wait_for_status(job, status) - if not cancel_job_safe(job, self.log): - return - time.sleep(3) # Wait for cleanup - self.assertIsNotNone(job._ws_client._server_close_code) - self.assertLess(final_it, iterations) - @run_cloud_legacy_real def test_final_result(self, service): """Test getting final result.""" @@ -539,44 +218,6 @@ def test_job_creation_date(self, service): for rjob in rjobs: self.assertTrue(rjob.creation_date) - @run_cloud_legacy_real - def test_websocket_proxy(self, service): - """Test connecting to websocket via proxy.""" - - def result_callback(job_id, interim_result): # pylint: disable=unused-argument - nonlocal callback_called - callback_called = True - - MockProxyServer(self, self.log).start() - callback_called = False - - with use_proxies(service, MockProxyServer.VALID_PROXIES): - job = self._run_program(service, iterations=1, callback=result_callback) - job.wait_for_final_state() - - self.assertTrue(callback_called) - - @run_cloud_legacy_real - def test_websocket_proxy_invalid_port(self, service): - """Test connecting to websocket via invalid proxy port.""" - - def result_callback(job_id, interim_result): # pylint: disable=unused-argument - nonlocal callback_called - callback_called = True - - callback_called = False - invalid_proxy = { - "https": "http://{}:{}".format( - MockProxyServer.PROXY_IP_ADDRESS, MockProxyServer.INVALID_PROXY_PORT - ) - } - # TODO - verify WebsocketError in output log. For some reason self.assertLogs - # doesn't always work even when the error is clearly logged. - with use_proxies(service, invalid_proxy): - job = self._run_program(service, iterations=2, callback=result_callback) - job.wait_for_final_state() - self.assertFalse(callback_called) - @run_cloud_legacy_real def test_job_logs(self, service): """Test job logs.""" @@ -588,31 +229,6 @@ def test_job_logs(self, service): self.assertIn("this is a stdout message", job_logs) self.assertIn("this is a stderr message", job_logs) - def _upload_program( - self, - service, - name=None, - max_execution_time=300, - data=None, - is_public: bool = False, - ): - """Upload a new program.""" - name = name or self._get_program_name() - data = data or RUNTIME_PROGRAM - metadata = copy.deepcopy(RUNTIME_PROGRAM_METADATA) - metadata["name"] = name - metadata["max_execution_time"] = max_execution_time - metadata["is_public"] = is_public - program_id = service.upload_program(data=data, metadata=metadata) - self.log.info("Uploaded runtime program %s", program_id) - self.to_delete[service.auth].append(program_id) - return program_id - - @classmethod - def _get_program_name(cls): - """Return a unique program name.""" - return PROGRAM_PREFIX + "_" + uuid.uuid4().hex - def _assert_complex_types_equal(self, expected, received): """Verify the received data in complex types is expected.""" if "serializable_class" in received: @@ -620,52 +236,3 @@ def _assert_complex_types_equal(self, expected, received): received["serializable_class"] ) self.assertEqual(expected, received) - - def _run_program( - self, - service, - program_id=None, - iterations=1, - inputs=None, - interim_results=None, - final_result=None, - callback=None, - backend=None, - ): - """Run a program.""" - self.log.debug("Running program on %s", service.auth) - inputs = ( - inputs - if inputs is not None - else { - "iterations": iterations, - "interim_results": interim_results or {}, - "final_result": final_result or {}, - } - ) - pid = program_id or self.program_ids[service.auth] - backend_name = backend or self.sim_backends[service.auth] - options = {"backend_name": backend_name} - job = service.run( - program_id=pid, inputs=inputs, options=options, callback=callback - ) - self.log.info("Runtime job %s submitted.", job.job_id) - self.to_cancel[service.auth].append(job) - return job - - def _wait_for_status(self, job, status): - """Wait for job to reach a certain status.""" - wait_time = 1 if status == JobStatus.QUEUED else self.poll_time - while job.status() not in JOB_FINAL_STATES + (status,): - time.sleep(wait_time) - if job.status() != status: - self.skipTest(f"Job {job.job_id} unable to reach status {status}.") - - def _get_real_device(self, service): - try: - # TODO: Remove filters when ibmq_berlin is removed - return service.least_busy( - simulator=False, filters=lambda b: b.name() != "ibmq_berlin" - ).name() - except QiskitBackendNotFoundError: - raise unittest.SkipTest("No real device") # cloud has no real device diff --git a/test/test_integration_program.py b/test/test_integration_program.py index 9c2d5800cf..7c194249a7 100644 --- a/test/test_integration_program.py +++ b/test/test_integration_program.py @@ -12,13 +12,9 @@ """Tests for runtime service.""" -import copy import unittest import os -import uuid -from contextlib import suppress import tempfile -from collections import defaultdict from qiskit_ibm_runtime.exceptions import IBMNotAuthorizedError from qiskit_ibm_runtime.runtime_program import RuntimeProgram @@ -26,36 +22,14 @@ RuntimeProgramNotFound, ) -from .ibm_test_case import IBMTestCase -from .utils.decorators import requires_cloud_legacy_services, run_cloud_legacy_real -from .utils.templates import RUNTIME_PROGRAM, RUNTIME_PROGRAM_METADATA, PROGRAM_PREFIX +from .ibm_test_case import IBMIntegrationTestCase +from .utils.decorators import run_cloud_legacy_real +from .utils.templates import RUNTIME_PROGRAM, PROGRAM_PREFIX -class TestIntegrationProgram(IBMTestCase): +class TestIntegrationProgram(IBMIntegrationTestCase): """Integration tests for runtime modules.""" - @classmethod - @requires_cloud_legacy_services - def setUpClass(cls, services): - """Initial class level setup.""" - # pylint: disable=arguments-differ - super().setUpClass() - cls.services = services - - def setUp(self) -> None: - """Test level setup.""" - super().setUp() - self.to_delete = defaultdict(list) - - def tearDown(self) -> None: - """Test level teardown.""" - super().tearDown() - # Delete programs - for service in self.services: - for prog in self.to_delete[service.auth]: - with suppress(Exception): - service.delete_program(prog) - @run_cloud_legacy_real def test_list_programs(self, service): """Test listing programs.""" @@ -210,7 +184,7 @@ def test_update_program_metadata(self, service): program_id = self._upload_program(service) original = service.program(program_id) new_metadata = { - "name": self._get_program_name(), + "name": PROGRAM_PREFIX, "description": "test_update_program_metadata", "max_execution_time": original.max_execution_time + 100, "spec": { @@ -233,27 +207,3 @@ def _validate_program(self, program): self.assertTrue(program.max_execution_time) self.assertTrue(program.creation_date) self.assertTrue(program.update_date) - - def _upload_program( - self, - service, - name=None, - max_execution_time=300, - data=None, - is_public: bool = False, - ): - """Upload a new program.""" - name = name or self._get_program_name() - data = data or RUNTIME_PROGRAM - metadata = copy.deepcopy(RUNTIME_PROGRAM_METADATA) - metadata["name"] = name - metadata["max_execution_time"] = max_execution_time - metadata["is_public"] = is_public - program_id = service.upload_program(data=data, metadata=metadata) - self.to_delete[service.auth].append(program_id) - return program_id - - @classmethod - def _get_program_name(cls): - """Return a unique program name.""" - return PROGRAM_PREFIX + "_" + uuid.uuid4().hex diff --git a/test/test_integration_retrieve_job.py b/test/test_integration_retrieve_job.py new file mode 100644 index 0000000000..92d997f139 --- /dev/null +++ b/test/test_integration_retrieve_job.py @@ -0,0 +1,139 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Tests for job functions using real runtime service.""" + +import uuid + +from qiskit.providers.jobstatus import JobStatus + +from .ibm_test_case import IBMIntegrationJobTestCase +from .utils.decorators import run_cloud_legacy_real +from .utils.utils import wait_for_status, get_real_device + + +class TestIntegrationRetrieveJob(IBMIntegrationJobTestCase): + """Integration tests for job retrieval functions.""" + + @run_cloud_legacy_real + def test_retrieve_job_queued(self, service): + """Test retrieving a queued job.""" + real_device = get_real_device(service) + _ = self._run_program(service, iterations=10, backend=real_device) + job = self._run_program(service, iterations=2, backend=real_device) + wait_for_status(job, JobStatus.QUEUED) + rjob = service.job(job.job_id) + self.assertEqual(job.job_id, rjob.job_id) + self.assertEqual(self.program_ids[service.auth], rjob.program_id) + + @run_cloud_legacy_real + def test_retrieve_job_running(self, service): + """Test retrieving a running job.""" + job = self._run_program(service, iterations=10) + wait_for_status(job, JobStatus.RUNNING) + rjob = service.job(job.job_id) + self.assertEqual(job.job_id, rjob.job_id) + self.assertEqual(self.program_ids[service.auth], rjob.program_id) + + @run_cloud_legacy_real + def test_retrieve_job_done(self, service): + """Test retrieving a finished job.""" + job = self._run_program(service) + job.wait_for_final_state() + rjob = service.job(job.job_id) + self.assertEqual(job.job_id, rjob.job_id) + self.assertEqual(self.program_ids[service.auth], rjob.program_id) + + @run_cloud_legacy_real + def test_retrieve_all_jobs(self, service): + """Test retrieving all jobs.""" + job = self._run_program(service) + rjobs = service.jobs() + found = False + for rjob in rjobs: + if rjob.job_id == job.job_id: + self.assertEqual(job.program_id, rjob.program_id) + self.assertEqual(job.inputs, rjob.inputs) + found = True + break + self.assertTrue(found, f"Job {job.job_id} not returned.") + + @run_cloud_legacy_real + def test_retrieve_jobs_limit(self, service): + """Test retrieving jobs with limit.""" + jobs = [] + for _ in range(3): + jobs.append(self._run_program(service)) + + rjobs = service.jobs(limit=2, program_id=self.program_ids[service.auth]) + self.assertEqual(len(rjobs), 2, f"Retrieved jobs: {[j.job_id for j in rjobs]}") + job_ids = {job.job_id for job in jobs} + rjob_ids = {rjob.job_id for rjob in rjobs} + self.assertTrue( + rjob_ids.issubset(job_ids), f"Submitted: {job_ids}, Retrieved: {rjob_ids}" + ) + + @run_cloud_legacy_real + def test_retrieve_pending_jobs(self, service): + """Test retrieving pending jobs (QUEUED, RUNNING).""" + job = self._run_program(service, iterations=10) + wait_for_status(job, JobStatus.RUNNING) + rjobs = service.jobs(pending=True) + found = False + for rjob in rjobs: + if rjob.job_id == job.job_id: + self.assertEqual(job.program_id, rjob.program_id) + self.assertEqual(job.inputs, rjob.inputs) + found = True + break + self.assertTrue(found, f"Pending job {job.job_id} not retrieved.") + + @run_cloud_legacy_real + def test_retrieve_returned_jobs(self, service): + """Test retrieving returned jobs (COMPLETED, FAILED, CANCELLED).""" + job = self._run_program(service) + job.wait_for_final_state() + rjobs = service.jobs(pending=False) + found = False + for rjob in rjobs: + if rjob.job_id == job.job_id: + self.assertEqual(job.program_id, rjob.program_id) + self.assertEqual(job.inputs, rjob.inputs) + found = True + break + self.assertTrue(found, f"Returned job {job.job_id} not retrieved.") + + @run_cloud_legacy_real + def test_retrieve_jobs_by_program_id(self, service): + """Test retrieving jobs by Program ID.""" + program_id = self._upload_program(service) + job = self._run_program(service, program_id=program_id) + job.wait_for_final_state() + rjobs = service.jobs(program_id=program_id) + self.assertEqual(program_id, rjobs[0].program_id) + self.assertEqual(1, len(rjobs), f"Retrieved jobs: {[j.job_id for j in rjobs]}") + + def test_jobs_filter_by_hgp(self): + """Test retrieving jobs by hgp.""" + service = [serv for serv in self.services if serv.auth == "legacy"][0] + default_hgp = list(service._hgps.keys())[0] + program_id = self._upload_program(service) + job = self._run_program(service, program_id=program_id) + job.wait_for_final_state() + rjobs = service.jobs(program_id=program_id, instance=default_hgp) + self.assertEqual(program_id, rjobs[0].program_id) + self.assertEqual(1, len(rjobs), f"Retrieved jobs: {[j.job_id for j in rjobs]}") + + uuid_ = uuid.uuid4().hex + fake_hgp = f"{uuid_}/{uuid_}/{uuid_}" + rjobs = service.jobs(program_id=program_id, instance=fake_hgp) + self.assertFalse(rjobs) diff --git a/test/utils/utils.py b/test/utils/utils.py index 438a992b27..484cece80a 100644 --- a/test/utils/utils.py +++ b/test/utils/utils.py @@ -14,9 +14,12 @@ import os import logging +import time +import unittest from qiskit import QuantumCircuit from qiskit.providers.jobstatus import JOB_FINAL_STATES, JobStatus +from qiskit.providers.exceptions import QiskitBackendNotFoundError from qiskit_ibm_runtime.hub_group_project import HubGroupProject from qiskit_ibm_runtime import IBMRuntimeService from qiskit_ibm_runtime.ibm_backend import IBMBackend @@ -113,3 +116,24 @@ def cancel_job_safe(job: RuntimeJob, logger: logging.Logger) -> bool: logger.warning("Unable to cancel job because it's already done.") return False raise + + +def wait_for_status(job, status, poll_time=1, time_out=20): + """Wait for job to reach a certain status.""" + wait_time = 1 if status == JobStatus.QUEUED else poll_time + while job.status() not in JOB_FINAL_STATES + (status,) and time_out > 0: + time.sleep(wait_time) + time_out -= wait_time + if job.status() != status: + raise unittest.SkipTest(f"Job {job.job_id} unable to reach status {status}.") + + +def get_real_device(service): + """Get a real device for the service.""" + try: + # TODO: Remove filters when ibmq_berlin is removed + return service.least_busy( + simulator=False, filters=lambda b: b.name() != "ibmq_berlin" + ).name() + except QiskitBackendNotFoundError: + raise unittest.SkipTest("No real device") # cloud has no real device