diff --git a/qiskit_ibm_runtime/accounts/account.py b/qiskit_ibm_runtime/accounts/account.py index 607134c141..3f81ba9628 100644 --- a/qiskit_ibm_runtime/accounts/account.py +++ b/qiskit_ibm_runtime/accounts/account.py @@ -68,7 +68,7 @@ def __init__( self, auth: AccountType, token: str, - url: Optional[str], + url: Optional[str] = None, instance: Optional[str] = None, # TODO: add validation for proxies input format proxies: Optional[dict] = None, @@ -120,3 +120,17 @@ def get_auth_handler(self) -> AuthBase: return CloudAuth(api_key=self.token, crn=self.instance) return LegacyAuth(access_token=self.token) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Account): + return False + return all( + [ + self.auth == other.auth, + self.token == other.token, + self.url == other.url, + self.instance == other.instance, + self.proxies == other.proxies, + self.verify == other.verify, + ] + ) diff --git a/qiskit_ibm_runtime/accounts/management.py b/qiskit_ibm_runtime/accounts/management.py index 0ecaf24e91..d95d9778d3 100644 --- a/qiskit_ibm_runtime/accounts/management.py +++ b/qiskit_ibm_runtime/accounts/management.py @@ -13,12 +13,12 @@ """Account management related classes and functions.""" import os -from typing import Optional, Union +from typing import Optional, Dict from .account import Account, AccountType from .storage import save_config, read_config, delete_config -_DEFAULT_ACCOUNG_CONFIG_JSON_FILE = os.path.join( +_DEFAULT_ACCOUNT_CONFIG_JSON_FILE = os.path.join( os.path.expanduser("~"), ".qiskit", "qiskit-ibm.json" ) _DEFAULT_ACCOUNT_NAME = "default" @@ -31,8 +31,9 @@ class AccountManager: """Class that bundles account management related functionality.""" - @staticmethod + @classmethod def save( + cls, token: Optional[str] = None, url: Optional[str] = None, instance: Optional[str] = None, @@ -43,9 +44,10 @@ def save( ) -> None: """Save account on disk.""" + config_key = name or cls._get_default_account_name(auth) return save_config( - filename=_DEFAULT_ACCOUNG_CONFIG_JSON_FILE, - name=name, + filename=_DEFAULT_ACCOUNT_CONFIG_JSON_FILE, + name=config_key, config=Account( token=token, url=url, @@ -57,10 +59,51 @@ def save( ) @staticmethod - def list() -> Union[dict, None]: + def list( + default: Optional[bool] = None, + auth: Optional[str] = None, + name: Optional[str] = None, + ) -> Dict[str, Account]: """List all accounts saved on disk.""" - return read_config(filename=_DEFAULT_ACCOUNG_CONFIG_JSON_FILE) + def _matching_name(account_name: str) -> bool: + return name is None or name == account_name + + def _matching_auth(account: Account) -> bool: + return auth is None or account.auth == auth + + def _matching_default(account_name: str) -> bool: + default_accounts = [ + _DEFAULT_ACCOUNT_NAME, + _DEFAULT_ACCOUNT_NAME_LEGACY, + _DEFAULT_ACCOUNT_NAME_CLOUD, + ] + if default is None: + return True + elif default is False: + return account_name not in default_accounts + else: + return account_name in default_accounts + + # load all accounts + all_accounts = map( + lambda kv: (kv[0], Account.from_saved_format(kv[1])), + read_config(filename=_DEFAULT_ACCOUNT_CONFIG_JSON_FILE).items(), + ) + + # filter based on input parameters + filtered_accounts = dict( + list( + filter( + lambda kv: _matching_auth(kv[1]) + and _matching_default(kv[0]) + and _matching_name(kv[0]), + all_accounts, + ) + ) + ) + + return filtered_accounts @classmethod def get( @@ -80,7 +123,7 @@ def get( """ if name: saved_account = read_config( - filename=_DEFAULT_ACCOUNG_CONFIG_JSON_FILE, name=name + filename=_DEFAULT_ACCOUNT_CONFIG_JSON_FILE, name=name ) if not saved_account: raise ValueError( @@ -95,14 +138,14 @@ def get( if auth: saved_account = read_config( - filename=_DEFAULT_ACCOUNG_CONFIG_JSON_FILE, + filename=_DEFAULT_ACCOUNT_CONFIG_JSON_FILE, name=cls._get_default_account_name(auth), ) if saved_account is None: raise ValueError(f"No default {auth} account saved.") return Account.from_saved_format(saved_account) - all_config = read_config(filename=_DEFAULT_ACCOUNG_CONFIG_JSON_FILE) + all_config = read_config(filename=_DEFAULT_ACCOUNT_CONFIG_JSON_FILE) for account_type in _ACCOUNT_TYPES: account_name = cls._get_default_account_name(account_type) if account_name in all_config: @@ -110,10 +153,18 @@ def get( return None - @staticmethod - def delete(name: Optional[str] = _DEFAULT_ACCOUNT_NAME) -> bool: + @classmethod + def delete( + cls, + name: Optional[str] = None, + auth: Optional[str] = None, + ) -> bool: """Delete account from disk.""" - return delete_config(name=name, filename=_DEFAULT_ACCOUNG_CONFIG_JSON_FILE) + + config_key = name or cls._get_default_account_name(auth) + return delete_config( + name=config_key, filename=_DEFAULT_ACCOUNT_CONFIG_JSON_FILE + ) @classmethod def _from_env_variables(cls, auth: Optional[AccountType]) -> Optional[Account]: @@ -129,7 +180,7 @@ def _from_env_variables(cls, auth: Optional[AccountType]) -> Optional[Account]: @classmethod def _get_default_account_name(cls, auth: AccountType) -> str: return ( - _DEFAULT_ACCOUNT_NAME_CLOUD - if auth == "cloud" - else _DEFAULT_ACCOUNT_NAME_LEGACY + _DEFAULT_ACCOUNT_NAME_LEGACY + if auth == "legacy" + else _DEFAULT_ACCOUNT_NAME_CLOUD ) diff --git a/qiskit_ibm_runtime/accounts/storage.py b/qiskit_ibm_runtime/accounts/storage.py index 79cc3c4d6f..fa8cc56038 100644 --- a/qiskit_ibm_runtime/accounts/storage.py +++ b/qiskit_ibm_runtime/accounts/storage.py @@ -13,9 +13,12 @@ """Utility functions related to storing account configuration on disk.""" import json +import logging import os from typing import Optional, Dict +logger = logging.getLogger(__name__) + def save_config( filename: str, @@ -23,7 +26,7 @@ def save_config( config: dict, ) -> None: """Save configuration data in a JSON file under the given name.""" - + logger.debug("Save configuration data for '%s' in '%s'", name, filename) _ensure_file_exists(filename) with open(filename, mode="r") as json_in: @@ -38,8 +41,8 @@ def read_config( filename: str, name: Optional[str] = None, ) -> Optional[Dict]: - """Save configuration data from a JSON file.""" - + """Read configuration data from a JSON file.""" + logger.debug("Read configuration data for '%s' from '%s'", name, filename) _ensure_file_exists(filename) with open(filename) as json_file: @@ -58,6 +61,8 @@ def delete_config( ) -> bool: """Delete configuration data from a JSON file.""" + logger.debug("Delete configuration data for '%s' from '%s'", name, filename) + _ensure_file_exists(filename) with open(filename, mode="r") as json_in: data = json.load(json_in) @@ -73,6 +78,7 @@ def delete_config( def _ensure_file_exists(filename: str, initial_content: str = "{}") -> None: if not os.path.isfile(filename): + logger.debug("Create empty configuration file at %s", filename) # create parent directories os.makedirs(os.path.dirname(filename), exist_ok=True) diff --git a/qiskit_ibm_runtime/ibm_runtime_service.py b/qiskit_ibm_runtime/ibm_runtime_service.py index c8100250f8..bbb3bbb39e 100644 --- a/qiskit_ibm_runtime/ibm_runtime_service.py +++ b/qiskit_ibm_runtime/ibm_runtime_service.py @@ -517,18 +517,20 @@ def active_account(self) -> Optional[Dict[str, str]]: return self._account.to_saved_format() @staticmethod - def delete_account(name: Optional[str]) -> bool: + def delete_account(name: Optional[str] = None, auth: Optional[str] = None) -> bool: """Delete a saved account from disk. Args: - name: Custom name of the saved account. Defaults to "default". + name: Name of the saved account to delete. + auth: Authentication type of the default account to delete. + Ignored if account name is provided. Returns: - True if the account with the given name was deleted. - False if no account was found for the given name. + True if the account was deleted. + False if no account was found. """ - return AccountManager.delete(name=name) + return AccountManager.delete(name=name, auth=auth) @staticmethod def save_account( @@ -565,9 +567,18 @@ def save_account( ) @staticmethod - def saved_accounts() -> dict: + def saved_accounts( + default: Optional[bool] = None, + auth: Optional[str] = None, + name: Optional[str] = None, + ) -> dict: """List the accounts saved on disk. + Args: + default: If set to True, only default accounts are returned. + auth: If set, only accounts with the given authentication type are returned. + name: If set, only accounts with the given name are returned. + Returns: A dictionary with information about the accounts saved on disk. @@ -575,7 +586,13 @@ def saved_accounts() -> dict: IBMProviderCredentialsInvalidUrl: If invalid IBM Quantum credentials are found on disk. """ - return AccountManager.list() + + return dict( + map( + lambda kv: (kv[0], Account.to_saved_format(kv[1])), + AccountManager.list(default=default, auth=auth, name=name).items(), + ), + ) def get_backend( self, diff --git a/test/test_account.py b/test/test_account.py index b8966d10e5..9b4fcecc92 100644 --- a/test/test_account.py +++ b/test/test_account.py @@ -12,20 +12,169 @@ """Tests for the account functions.""" +import json import uuid import logging import os from unittest import skipIf from qiskit_ibm_runtime.accounts.account import CLOUD_API_URL, LEGACY_API_URL +from qiskit_ibm_runtime.accounts import AccountManager, Account, management from qiskit_ibm_runtime.exceptions import IBMInputValueError from .ibm_test_case import IBMTestCase from .mock.fake_runtime_service import FakeRuntimeService -from .utils.account import get_qiskitrc_contents, custom_qiskitrc, no_envs, custom_envs +from .utils.account import ( + get_account_config_contents, + temporary_account_config_file, + no_envs, + custom_envs, +) + +_TEST_LEGACY_ACCOUNT = Account( + auth="legacy", + token="token-x", + url="https://auth.quantum-computing.ibm.com/api", + instance="ibm-q/open/main", +) + +_TEST_CLOUD_ACCOUNT = Account( + auth="cloud", + token="token-y", + url="https://cloud.ibm.com", + instance="crn:v1:bluemix:public:quantum-computing:us-east:a/...::", +) + +# NamedTemporaryFiles not supported in Windows +@skipIf(os.name == "nt", "Test not supported in Windows") +class TestAccountManager(IBMTestCase): + """Tests for AccountManager class.""" + + @temporary_account_config_file(contents={}) + def test_save_get(self): + """Test save and get.""" + + # Each tuple contains the + # - account to save + # - the name passed to AccountManager.save + # - the name passed to AccountManager.get + sub_tests = [ + # verify accounts can be saved and retrieved via custom names + (_TEST_LEGACY_ACCOUNT, "acct-1", "acct-1"), + (_TEST_CLOUD_ACCOUNT, "acct-2", "acct-2"), + # verify default account name handling for cloud accounts + (_TEST_CLOUD_ACCOUNT, None, management._DEFAULT_ACCOUNT_NAME_CLOUD), + (_TEST_CLOUD_ACCOUNT, None, None), + # verify default account name handling for legacy accounts + (_TEST_LEGACY_ACCOUNT, None, management._DEFAULT_ACCOUNT_NAME_LEGACY), + # verify account override + (_TEST_LEGACY_ACCOUNT, "acct", "acct"), + (_TEST_CLOUD_ACCOUNT, "acct", "acct"), + ] + for account, name_save, name_get in sub_tests: + with self.subTest( + f"for account type '{account.auth}' " + f"using `save(name={name_save})` and `get(name={name_get})`" + ): + AccountManager.save( + token=account.token, + url=account.url, + instance=account.instance, + auth=account.auth, + proxies=account.proxies, + verify=account.verify, + name=name_save, + ) + self.assertEqual(account, AccountManager.get(name=name_get)) + + @temporary_account_config_file( + contents=json.dumps( + { + "cloud": _TEST_CLOUD_ACCOUNT.to_saved_format(), + "legacy": _TEST_LEGACY_ACCOUNT.to_saved_format(), + } + ) + ) + def test_list(self): + """Test list.""" + + with temporary_account_config_file( + contents={ + "key1": _TEST_CLOUD_ACCOUNT.to_saved_format(), + "key2": _TEST_LEGACY_ACCOUNT.to_saved_format(), + } + ), self.subTest("non-empty list of accounts"): + accounts = AccountManager.list() + + self.assertEqual(len(accounts), 2) + self.assertEqual(accounts["key1"], _TEST_CLOUD_ACCOUNT) + self.assertTrue(accounts["key2"], _TEST_LEGACY_ACCOUNT) + + with temporary_account_config_file(contents={}), self.subTest( + "empty list of accounts" + ): + self.assertEqual(len(AccountManager.list()), 0) + + with temporary_account_config_file( + contents={ + "key1": _TEST_CLOUD_ACCOUNT.to_saved_format(), + "key2": _TEST_LEGACY_ACCOUNT.to_saved_format(), + management._DEFAULT_ACCOUNT_NAME_CLOUD: Account( + "cloud", "token-legacy" + ).to_saved_format(), + management._DEFAULT_ACCOUNT_NAME_LEGACY: Account( + "legacy", "token-cloud" + ).to_saved_format(), + } + ), self.subTest("filtered list of accounts"): + accounts = list(AccountManager.list(auth="cloud").keys()) + self.assertEqual(len(accounts), 2) + self.assertListEqual( + accounts, ["key1", management._DEFAULT_ACCOUNT_NAME_CLOUD] + ) + + accounts = list(AccountManager.list(auth="legacy").keys()) + self.assertEqual(len(accounts), 2) + self.assertListEqual( + accounts, ["key2", management._DEFAULT_ACCOUNT_NAME_LEGACY] + ) + + accounts = list(AccountManager.list(auth="cloud", default=True).keys()) + self.assertEqual(len(accounts), 1) + self.assertListEqual(accounts, [management._DEFAULT_ACCOUNT_NAME_CLOUD]) + + accounts = list(AccountManager.list(auth="cloud", default=False).keys()) + self.assertEqual(len(accounts), 1) + self.assertListEqual(accounts, ["key1"]) + + accounts = list(AccountManager.list(name="key1").keys()) + self.assertEqual(len(accounts), 1) + self.assertListEqual(accounts, ["key1"]) + + @temporary_account_config_file( + contents={ + "key1": _TEST_CLOUD_ACCOUNT.to_saved_format(), + management._DEFAULT_ACCOUNT_NAME_LEGACY: _TEST_LEGACY_ACCOUNT.to_saved_format(), + management._DEFAULT_ACCOUNT_NAME_CLOUD: _TEST_CLOUD_ACCOUNT.to_saved_format(), + } + ) + def test_delete(self): + """Test delete.""" + + with self.subTest("delete named account"): + self.assertTrue(AccountManager.delete(name="key1")) + self.assertFalse(AccountManager.delete(name="key1")) + + with self.subTest("delete default legacy account"): + self.assertTrue(AccountManager.delete(auth="legacy")) + with self.subTest("delete default cloud account"): + self.assertTrue(AccountManager.delete()) -# NamedTemporaryFiles not support in Windows + self.assertTrue(len(AccountManager.list()) == 0) + + +# NamedTemporaryFiles not supported in Windows @skipIf(os.name == "nt", "Test not supported in Windows") class TestEnableAccount(IBMTestCase): """Tests for IBMRuntimeService enable account.""" @@ -34,7 +183,7 @@ def test_enable_account_by_name(self): """Test initializing account by name.""" name = "foo" token = uuid.uuid4().hex - with custom_qiskitrc(name=name, token=token): + with temporary_account_config_file(name=name, token=token): service = FakeRuntimeService(name=name) self.assertTrue(service._account) @@ -45,7 +194,7 @@ def test_enable_account_by_auth(self): for auth in ["cloud", "legacy"]: with self.subTest(auth=auth), no_envs(["QISKIT_IBM_API_TOKEN"]): token = uuid.uuid4().hex - with custom_qiskitrc(auth=auth, token=token): + with temporary_account_config_file(auth=auth, token=token): service = FakeRuntimeService(auth=auth) self.assertTrue(service._account) self.assertEqual(service._account.token, token) @@ -75,7 +224,9 @@ def test_enable_account_by_name_and_other(self): name = "foo" token = uuid.uuid4().hex for param in subtests: - with self.subTest(param=param), custom_qiskitrc(name=name, token=token): + with self.subTest(param=param), temporary_account_config_file( + name=name, token=token + ): with self.assertLogs("qiskit_ibm_runtime", logging.WARNING) as logged: service = FakeRuntimeService(name=name, **param) @@ -111,7 +262,7 @@ def test_enable_account_by_auth_url(self): for auth in subtests: with self.subTest(auth=auth): token = uuid.uuid4().hex - with custom_qiskitrc(auth=auth, token=token), no_envs( + with temporary_account_config_file(auth=auth, token=token), no_envs( ["QISKIT_IBM_API_TOKEN"] ): with self.assertLogs( @@ -131,7 +282,7 @@ def test_enable_account_by_only_auth(self): for auth in subtests: with self.subTest(auth=auth): token = uuid.uuid4().hex - with custom_qiskitrc(auth=auth, token=token), no_envs( + with temporary_account_config_file(auth=auth, token=token), no_envs( ["QISKIT_IBM_API_TOKEN"] ): service = FakeRuntimeService() @@ -144,9 +295,13 @@ def test_enable_account_by_only_auth(self): def test_enable_account_both_auth(self): """Test initializing account with both saved types.""" token = uuid.uuid4().hex - contents = get_qiskitrc_contents(auth="cloud", token=token) - contents.update(get_qiskitrc_contents(auth="legacy", token=uuid.uuid4().hex)) - with custom_qiskitrc(contents=contents), no_envs(["QISKIT_IBM_API_TOKEN"]): + contents = get_account_config_contents(auth="cloud", token=token) + contents.update( + get_account_config_contents(auth="legacy", token=uuid.uuid4().hex) + ) + with temporary_account_config_file(contents=contents), no_envs( + ["QISKIT_IBM_API_TOKEN"] + ): service = FakeRuntimeService() self.assertTrue(service._account) self.assertEqual(service._account.token, token) @@ -193,14 +348,14 @@ def test_enable_account_by_env_token_url(self): def test_enable_account_bad_name(self): """Test initializing account by bad name.""" name = "phantom" - with custom_qiskitrc() as _, self.assertRaises(ValueError) as err: + with temporary_account_config_file() as _, self.assertRaises(ValueError) as err: _ = FakeRuntimeService(name=name) self.assertIn(name, str(err.exception)) def test_enable_account_bad_auth(self): """Test initializing account by bad name.""" auth = "phantom" - with custom_qiskitrc() as _, self.assertRaises(ValueError) as err: + with temporary_account_config_file() as _, self.assertRaises(ValueError) as err: _ = FakeRuntimeService(auth=auth) self.assertIn("auth", str(err.exception)) @@ -215,7 +370,9 @@ def test_enable_account_by_name_pref(self): ] for extra in subtests: with self.subTest(extra=extra): - with custom_qiskitrc(name=name, verify=True, proxies="some proxies"): + with temporary_account_config_file( + name=name, verify=True, proxies="some proxies" + ): service = FakeRuntimeService(name=name, **extra) self.assertTrue(service._account) self._verify_prefs(extra, service._account) @@ -230,9 +387,13 @@ def test_enable_account_by_auth_pref(self): ] for auth in ["cloud", "legacy"]: for extra in subtests: - with self.subTest(auth=auth, extra=extra), custom_qiskitrc( + with self.subTest( + auth=auth, extra=extra + ), temporary_account_config_file( auth=auth, verify=True, proxies="some proxies" - ), no_envs(["QISKIT_IBM_API_TOKEN"]): + ), no_envs( + ["QISKIT_IBM_API_TOKEN"] + ): service = FakeRuntimeService(auth=auth, **extra) self.assertTrue(service._account) self._verify_prefs(extra, service._account) @@ -264,7 +425,7 @@ def test_enable_account_by_name_input_instance(self): """Test initializing account by name and input instance.""" name = "foo" instance = uuid.uuid4().hex - with custom_qiskitrc(name=name, instance=""): + with temporary_account_config_file(name=name, instance=""): service = FakeRuntimeService(name=name, instance=instance) self.assertTrue(service._account) self.assertEqual(service._account.instance, instance) @@ -272,7 +433,7 @@ def test_enable_account_by_name_input_instance(self): def test_enable_account_by_auth_input_instance(self): """Test initializing account by auth and input instance.""" instance = uuid.uuid4().hex - with custom_qiskitrc(auth="cloud", instance=""): + with temporary_account_config_file(auth="cloud", instance=""): service = FakeRuntimeService(auth="cloud", instance=instance) self.assertTrue(service._account) self.assertEqual(service._account.instance, instance) diff --git a/test/utils/account.py b/test/utils/account.py index ccebd8a883..ddb1f8fd9b 100644 --- a/test/utils/account.py +++ b/test/utils/account.py @@ -12,8 +12,8 @@ """Context managers for using with IBM Provider unit tests.""" -import os import json +import os import uuid from contextlib import ContextDecorator from tempfile import NamedTemporaryFile @@ -23,7 +23,6 @@ from qiskit_ibm_runtime.accounts.account import CLOUD_API_URL, LEGACY_API_URL from qiskit_ibm_runtime.credentials.environ import VARIABLES_MAP - CREDENTIAL_ENV_VARS = VARIABLES_MAP.keys() @@ -102,35 +101,34 @@ def side_effect(self, filename_): return self.isfile_original(filename_) -class custom_qiskitrc(ContextDecorator): +class temporary_account_config_file(ContextDecorator): """Context manager that uses a temporary qiskitrc.""" # pylint: disable=invalid-name def __init__(self, contents=None, **kwargs): # Create a temporary file with the contents. - contents = contents or get_qiskitrc_contents(**kwargs) + contents = ( + contents if contents is not None else get_account_config_contents(**kwargs) + ) + self.tmp_file = NamedTemporaryFile(mode="w+") json.dump(contents, self.tmp_file) self.tmp_file.flush() - self.default_qiskitrc_file_original = ( - management._DEFAULT_ACCOUNG_CONFIG_JSON_FILE - ) + self.account_config_json_backup = management._DEFAULT_ACCOUNT_CONFIG_JSON_FILE def __enter__(self): - # Temporarily modify the default location of the qiskitrc file. - management._DEFAULT_ACCOUNG_CONFIG_JSON_FILE = self.tmp_file.name + # Temporarily modify the default location of the configuration file. + management._DEFAULT_ACCOUNT_CONFIG_JSON_FILE = self.tmp_file.name return self def __exit__(self, *exc): # Delete the temporary file and restore the default location. self.tmp_file.close() - management._DEFAULT_ACCOUNG_CONFIG_JSON_FILE = ( - self.default_qiskitrc_file_original - ) + management._DEFAULT_ACCOUNT_CONFIG_JSON_FILE = self.account_config_json_backup -def get_qiskitrc_contents( +def get_account_config_contents( name=None, auth="cloud", token=None,