Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions src/rhsmlib/dbus/objects/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import threading
import dbus
import dbus.service
import subscription_manager.injection as inj

from rhsmlib.dbus import constants, exceptions, dbus_utils, base_object, server, util
from rhsmlib.services.register import RegisterService
from rhsmlib.services.unregister import UnregisterService
from rhsmlib.services.attach import AttachService
from rhsmlib.services.entitlement import EntitlementService

Expand Down Expand Up @@ -251,6 +253,25 @@ def _remove_enable_content_option(options: dict) -> bool:
enable_content = False
return enable_content

def _unregister(self, connection_options: dict) -> None:
"""
Unregisters a system that is currently registered and
cleans the cp_provider to handle authorization after un-registering.
:param connection_options: dictionary with connection options
:return: None
"""
self.ensure_registered()
log.info("This system is already registered, attempting to un-register...")

cp_provider = inj.require(inj.CP_PROVIDER)

cp = self.build_uep(connection_options)
UnregisterService(cp).unregister()

# The CPProvider object must be cleaned and the cp object must
# be re-initialized to handle authorization after un-registration.
cp_provider.clean()

@dbus.service.method(
dbus_interface=constants.PRIVATE_REGISTER_INTERFACE,
in_signature='sssa{sv}a{sv}s',
Expand All @@ -268,9 +289,6 @@ def Register(self, org, username, password, options, connection_options, locale)

Note this method is registration ONLY. Auto-attach is a separate process.
"""
if self.is_registered():
raise dbus.DBusException("This system is already registered")

org = dbus_utils.dbus_to_python(org, expected_type=str)
connection_options = dbus_utils.dbus_to_python(connection_options, expected_type=dict)
connection_options['username'] = dbus_utils.dbus_to_python(username, expected_type=str)
Expand All @@ -279,6 +297,16 @@ def Register(self, org, username, password, options, connection_options, locale)
locale = dbus_utils.dbus_to_python(locale, expected_type=str)

Locale.set(locale)

force_registration: bool = options.get("force", False)
system_is_registered: bool = self.is_registered()
if system_is_registered and not force_registration:
raise dbus.DBusException("This system is already registered.")

# If the system is registered and the 'force' option is specified,
# Unregister the system and register the system again.
if system_is_registered and force_registration:
self._unregister(connection_options)
cp = self.build_uep(connection_options)

register_service = RegisterService(cp)
Expand Down Expand Up @@ -325,6 +353,16 @@ def RegisterWithActivationKeys(self, org, activation_keys, options, connection_o
locale = dbus_utils.dbus_to_python(locale, expected_type=str)

Locale.set(locale)

force_registration: bool = options.get("force", False)
system_is_registered: bool = self.is_registered()
if system_is_registered and not force_registration:
raise dbus.DBusException("This system is already registered.")

# If the system is registered and the 'force' option is specified,
# Unregister the system and register the system again.
if system_is_registered and force_registration:
self._unregister(connection_options)
cp = self.build_uep(connection_options)

register_service = RegisterService(cp)
Expand Down
88 changes: 88 additions & 0 deletions test/rhsmlib_test/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,10 @@ def setUp(self):
self.mock_register = register_patcher.start().return_value
self.addCleanup(register_patcher.stop)

unregister_patcher = mock.patch('rhsmlib.dbus.objects.unregister.UnregisterService.unregister')
self.mock_unregister = unregister_patcher.start().return_value
self.addCleanup(unregister_patcher.stop)

cert_invoker_patcher = mock.patch('rhsmlib.dbus.objects.register.EntCertActionInvoker', autospec=True)
self.mock_cert_invoker = cert_invoker_patcher.start().return_value
self.addCleanup(cert_invoker_patcher.stop)
Expand Down Expand Up @@ -885,6 +889,37 @@ def assertions(*args):
dbus_method_args = ['admin', 'admin', 'admin', {}, {}, '']
self.dbus_request(assertions, self._build_interface().Register, dbus_method_args)

def test_cannot_register_over_domain_socket_when_already_registered(self):
expected_consumer = json.loads(CONSUMER_CONTENT_JSON)

def assertions(*args):
# Be sure we are persisting the consumer cert
self.assertEqual(json.loads(args[0]), expected_consumer)

self.mock_identity.is_valid.return_value = True
self.mock_identity.uuid = 'INVALIDCONSUMERUUID'

self.mock_register.register.return_value = expected_consumer

dbus_method_args = ['admin', 'admin', 'admin', {}, {}, '']
with self.assertRaisesRegexp(dbus.exceptions.DBusException, r'.* system is already registered.'):
self.dbus_request(assertions, self._build_interface().Register, dbus_method_args)

def test_can_register_over_domain_socket_with_force_option(self):
expected_consumer = json.loads(CONSUMER_CONTENT_JSON)

def assertions(*args):
# Be sure we are persisting the consumer cert
self.assertEqual(json.loads(args[0]), expected_consumer)

self.mock_identity.is_valid.return_value = True
self.mock_identity.uuid = 'INVALIDCONSUMERUUID'

self.mock_register.register.return_value = expected_consumer

dbus_method_args = ['admin', 'admin', 'admin', {'force': True}, {}, '']
self.dbus_request(assertions, self._build_interface().Register, dbus_method_args)

def test_can_register_over_domain_socket_no_enabled_content(self):
"""
Test calling Register method with argument "enable_content", when
Expand Down Expand Up @@ -998,3 +1033,56 @@ def assertions(*args):
]

self.dbus_request(assertions, self._build_interface().RegisterWithActivationKeys, dbus_method_args)

def test_cannot_register_over_domain_socket_with_activation_keys_when_already_registered(self):
expected_consumer = json.loads(CONSUMER_CONTENT_JSON)

def assertions(*args):
# Be sure we are persisting the consumer cert
self.assertEqual(json.loads(args[0]), expected_consumer)

self.mock_identity.is_valid.return_value = True
self.mock_identity.uuid = 'INVALIDCONSUMERUUID'

self.mock_register.register.return_value = expected_consumer

dbus_method_args = [
'admin',
['key1', 'key2'],
{},
{
'host': 'localhost',
'port': '8443',
'handler': '/candlepin'
},
''
]

with self.assertRaisesRegexp(dbus.exceptions.DBusException, r'.* system is already registered.'):
self.dbus_request(assertions, self._build_interface().RegisterWithActivationKeys, dbus_method_args)

def test_can_register_over_domain_socket_with_activation_keys_and_force_option(self):
expected_consumer = json.loads(CONSUMER_CONTENT_JSON)

def assertions(*args):
# Be sure we are persisting the consumer cert
self.assertEqual(json.loads(args[0]), expected_consumer)

self.mock_identity.is_valid.return_value = True
self.mock_identity.uuid = 'INVALIDCONSUMERUUID'

self.mock_register.register.return_value = expected_consumer

dbus_method_args = [
'admin',
['key1', 'key2'],
{'force': True},
{
'host': 'localhost',
'port': '8443',
'handler': '/candlepin'
},
''
]

self.dbus_request(assertions, self._build_interface().RegisterWithActivationKeys, dbus_method_args)