diff --git a/src/rhsmlib/dbus/objects/register.py b/src/rhsmlib/dbus/objects/register.py index 6bbd7abf59..193c020144 100644 --- a/src/rhsmlib/dbus/objects/register.py +++ b/src/rhsmlib/dbus/objects/register.py @@ -18,9 +18,11 @@ import threading import dbus import dbus.service +import subscription_manager.injection as inj from rhsmlib.dbus import constants, exceptions, dbus_utils, base_object, server, util from rhsmlib.services.register import RegisterService +from rhsmlib.services.unregister import UnregisterService from rhsmlib.services.attach import AttachService from rhsmlib.services.entitlement import EntitlementService @@ -251,6 +253,25 @@ def _remove_enable_content_option(options: dict) -> bool: enable_content = False return enable_content + def _unregister(self, connection_options: dict) -> None: + """ + Unregisters a system that is currently registered and + cleans the cp_provider to handle authorization after un-registering. + :param connection_options: dictionary with connection options + :return: None + """ + self.ensure_registered() + log.info("This system is already registered, attempting to un-register...") + + cp_provider = inj.require(inj.CP_PROVIDER) + + cp = self.build_uep(connection_options) + UnregisterService(cp).unregister() + + # The CPProvider object must be cleaned and the cp object must + # be re-initialized to handle authorization after un-registration. + cp_provider.clean() + @dbus.service.method( dbus_interface=constants.PRIVATE_REGISTER_INTERFACE, in_signature='sssa{sv}a{sv}s', @@ -268,9 +289,6 @@ def Register(self, org, username, password, options, connection_options, locale) Note this method is registration ONLY. Auto-attach is a separate process. """ - if self.is_registered(): - raise dbus.DBusException("This system is already registered") - org = dbus_utils.dbus_to_python(org, expected_type=str) connection_options = dbus_utils.dbus_to_python(connection_options, expected_type=dict) connection_options['username'] = dbus_utils.dbus_to_python(username, expected_type=str) @@ -279,6 +297,16 @@ def Register(self, org, username, password, options, connection_options, locale) locale = dbus_utils.dbus_to_python(locale, expected_type=str) Locale.set(locale) + + force_registration: bool = options.get("force", False) + system_is_registered: bool = self.is_registered() + if system_is_registered and not force_registration: + raise dbus.DBusException("This system is already registered.") + + # If the system is registered and the 'force' option is specified, + # Unregister the system and register the system again. + if system_is_registered and force_registration: + self._unregister(connection_options) cp = self.build_uep(connection_options) register_service = RegisterService(cp) @@ -325,6 +353,16 @@ def RegisterWithActivationKeys(self, org, activation_keys, options, connection_o locale = dbus_utils.dbus_to_python(locale, expected_type=str) Locale.set(locale) + + force_registration: bool = options.get("force", False) + system_is_registered: bool = self.is_registered() + if system_is_registered and not force_registration: + raise dbus.DBusException("This system is already registered.") + + # If the system is registered and the 'force' option is specified, + # Unregister the system and register the system again. + if system_is_registered and force_registration: + self._unregister(connection_options) cp = self.build_uep(connection_options) register_service = RegisterService(cp) diff --git a/test/rhsmlib_test/test_register.py b/test/rhsmlib_test/test_register.py index 8821891d87..d56b1899db 100644 --- a/test/rhsmlib_test/test_register.py +++ b/test/rhsmlib_test/test_register.py @@ -772,6 +772,10 @@ def setUp(self): self.mock_register = register_patcher.start().return_value self.addCleanup(register_patcher.stop) + unregister_patcher = mock.patch('rhsmlib.dbus.objects.unregister.UnregisterService.unregister') + self.mock_unregister = unregister_patcher.start().return_value + self.addCleanup(unregister_patcher.stop) + cert_invoker_patcher = mock.patch('rhsmlib.dbus.objects.register.EntCertActionInvoker', autospec=True) self.mock_cert_invoker = cert_invoker_patcher.start().return_value self.addCleanup(cert_invoker_patcher.stop) @@ -885,6 +889,37 @@ def assertions(*args): dbus_method_args = ['admin', 'admin', 'admin', {}, {}, ''] self.dbus_request(assertions, self._build_interface().Register, dbus_method_args) + def test_cannot_register_over_domain_socket_when_already_registered(self): + expected_consumer = json.loads(CONSUMER_CONTENT_JSON) + + def assertions(*args): + # Be sure we are persisting the consumer cert + self.assertEqual(json.loads(args[0]), expected_consumer) + + self.mock_identity.is_valid.return_value = True + self.mock_identity.uuid = 'INVALIDCONSUMERUUID' + + self.mock_register.register.return_value = expected_consumer + + dbus_method_args = ['admin', 'admin', 'admin', {}, {}, ''] + with self.assertRaisesRegexp(dbus.exceptions.DBusException, r'.* system is already registered.'): + self.dbus_request(assertions, self._build_interface().Register, dbus_method_args) + + def test_can_register_over_domain_socket_with_force_option(self): + expected_consumer = json.loads(CONSUMER_CONTENT_JSON) + + def assertions(*args): + # Be sure we are persisting the consumer cert + self.assertEqual(json.loads(args[0]), expected_consumer) + + self.mock_identity.is_valid.return_value = True + self.mock_identity.uuid = 'INVALIDCONSUMERUUID' + + self.mock_register.register.return_value = expected_consumer + + dbus_method_args = ['admin', 'admin', 'admin', {'force': True}, {}, ''] + self.dbus_request(assertions, self._build_interface().Register, dbus_method_args) + def test_can_register_over_domain_socket_no_enabled_content(self): """ Test calling Register method with argument "enable_content", when @@ -998,3 +1033,56 @@ def assertions(*args): ] self.dbus_request(assertions, self._build_interface().RegisterWithActivationKeys, dbus_method_args) + + def test_cannot_register_over_domain_socket_with_activation_keys_when_already_registered(self): + expected_consumer = json.loads(CONSUMER_CONTENT_JSON) + + def assertions(*args): + # Be sure we are persisting the consumer cert + self.assertEqual(json.loads(args[0]), expected_consumer) + + self.mock_identity.is_valid.return_value = True + self.mock_identity.uuid = 'INVALIDCONSUMERUUID' + + self.mock_register.register.return_value = expected_consumer + + dbus_method_args = [ + 'admin', + ['key1', 'key2'], + {}, + { + 'host': 'localhost', + 'port': '8443', + 'handler': '/candlepin' + }, + '' + ] + + with self.assertRaisesRegexp(dbus.exceptions.DBusException, r'.* system is already registered.'): + self.dbus_request(assertions, self._build_interface().RegisterWithActivationKeys, dbus_method_args) + + def test_can_register_over_domain_socket_with_activation_keys_and_force_option(self): + expected_consumer = json.loads(CONSUMER_CONTENT_JSON) + + def assertions(*args): + # Be sure we are persisting the consumer cert + self.assertEqual(json.loads(args[0]), expected_consumer) + + self.mock_identity.is_valid.return_value = True + self.mock_identity.uuid = 'INVALIDCONSUMERUUID' + + self.mock_register.register.return_value = expected_consumer + + dbus_method_args = [ + 'admin', + ['key1', 'key2'], + {'force': True}, + { + 'host': 'localhost', + 'port': '8443', + 'handler': '/candlepin' + }, + '' + ] + + self.dbus_request(assertions, self._build_interface().RegisterWithActivationKeys, dbus_method_args)