Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/plugins/dnf/product-id.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#

import logging
from typing import Set

from subscription_manager.productid import ProductManager
from subscription_manager.utils import chroot
Expand Down Expand Up @@ -202,7 +203,7 @@ def write_productid_cache(self, product_ids):
def read_productid_cache(self):
return self.__read_cache_file(self.PRODUCTID_CACHE_FILE)

def get_active(self):
def get_active(self) -> Set[str]:
"""
Find the list of repos that provide packages that are actually installed.
"""
Expand Down
16 changes: 8 additions & 8 deletions src/rhsm/certificate2.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ def _read_alt_name(self, x509) -> str:
else:
return alt_name.decode("utf-8")

def _read_issuer(self, x509: _certificate.X509) -> str:
def _read_issuer(self, x509: _certificate.X509) -> dict:
return x509.get_issuer()

def _read_subject(self, x509: _certificate.X509) -> str:
def _read_subject(self, x509: _certificate.X509) -> dict:
return x509.get_subject()

def _create_identity_cert(
Expand Down Expand Up @@ -504,9 +504,9 @@ def __init__(
serial: Optional[int] = None,
start: Optional[datetime.datetime] = None,
end: Optional[datetime.datetime] = None,
subject: Optional[str] = None,
subject: Optional[str] = dict,
pem: Optional[str] = None,
issuer: Optional[str] = None,
issuer: Optional[str] = dict,
):
# The rhsm._certificate X509 object for this certificate.
# WARNING: May be None in tests
Expand All @@ -531,8 +531,8 @@ def __init__(
self.valid_range = DateRange(self.start, self.end)
self.pem: Optional[str] = pem

self.subject: Optional[str] = subject
self.issuer: Optional[str] = issuer
self.subject: Optional[dict] = subject
self.issuer: Optional[dict] = issuer

def is_valid(self, on_date: Optional[datetime.datetime] = None):
gmt = datetime.datetime.utcnow()
Expand Down Expand Up @@ -615,14 +615,14 @@ class EntitlementCertificate(ProductCertificate):
def __init__(
self,
order: Optional["Order"] = None,
content: Optional["Content"] = None,
content: Optional[List["Content"]] = None,
pool: Optional["Pool"] = None,
extensions: Optional[Extensions] = None,
**kwargs,
):
ProductCertificate.__init__(self, **kwargs)
self.order: Optional[Order] = order
self.content: Optional[Content] = content
self.content: Optional[List[Content]] = content
self.pool: Optional[Pool] = pool
self.extensions: Optional[Extensions] = extensions
self._path_tree_object = None
Expand Down
6 changes: 3 additions & 3 deletions src/rhsm/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import sys
import time
import traceback
from typing import Optional, Any, Union, List, Dict, Tuple
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from pathlib import Path
import re
import enum
Expand Down Expand Up @@ -547,7 +547,7 @@ def __init__(self, cert_dir: str = None, **kwargs) -> None:
handler="/", cert_dir=cert_dir, user_agent=user_agent, **kwargs
)

def get_versions(self, path: str, cert_key_pairs: list = None) -> Union[dict, None]:
def get_versions(self, path: str, cert_key_pairs: Iterable[Tuple[str, str]] = None) -> Union[dict, None]:
"""
Get list of available release versions from the given path
:param path: path, where is simple text file containing supported release versions
Expand Down Expand Up @@ -1722,7 +1722,7 @@ def updatePackageProfile(self, consumer_uuid: str, pkg_dicts: dict) -> dict:
method = "/consumers/%s/packages" % self.sanitize(consumer_uuid)
return self.conn.request_put(method, pkg_dicts, description=_("Updating profile information"))

def updateCombinedProfile(self, consumer_uuid: str, profile: dict) -> dict:
def updateCombinedProfile(self, consumer_uuid: str, profile: List[Dict]) -> dict:
"""
Updates the costumers' combined profile containing package profile,
enabled repositories and dnf modules.
Expand Down
2 changes: 1 addition & 1 deletion src/rhsmlib/facts/cloud_facts.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def get_aws_facts(self) -> Dict[str, Union[str, None]]:

facts: Dict[str, Union[str, None]] = {}
if metadata_str is not None:
values: dict[str, Union[str, None]] = self.parse_json_content(metadata_str)
values: Dict[str, Union[str, None]] = self.parse_json_content(metadata_str)

# Add these three attributes to system facts
if "instanceId" in values:
Expand Down
9 changes: 5 additions & 4 deletions src/rhsmlib/file_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
import threading
from typing import Dict

from rhsm.config import get_config_parser
from rhsmlib.services import config
Expand Down Expand Up @@ -55,14 +56,14 @@ class FilesystemWatcher(object):
# Timeout of loop in milliseconds
TIMEOUT = 2000

def __init__(self, dir_watches):
def __init__(self, dir_watches: Dict[str, "DirectoryWatch"]):
"""
:param dir_watches: dictionary of directories to watch (see DirectoryWatch class below)
"""
self.dir_watches = dir_watches
self.should_stop = False
self.dir_watches: Dict[str, DirectoryWatch] = dir_watches
self.should_stop: bool = False

def stop(self):
def stop(self) -> None:
"""
Calling this method stops infinity loop of FileSystemWatcher
"""
Expand Down
3 changes: 2 additions & 1 deletion src/rhsmlib/services/syspurpose.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"""

import logging
from typing import Dict

from rhsm.connection import UEPConnection

Expand All @@ -41,7 +42,7 @@ def __init__(self, cp: UEPConnection) -> None:
self.owner = None
self.valid_fields = None

def get_syspurpose_status(self, on_date: str = None) -> str:
def get_syspurpose_status(self, on_date: str = None) -> Dict:
"""
Get syspurpose status from candlepin server
:param on_date: Date of the status
Expand Down
34 changes: 23 additions & 11 deletions src/subscription_manager/action_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
# in this software or its documentation.
#
import logging
from typing import List, TYPE_CHECKING

from subscription_manager import base_action_client

if TYPE_CHECKING:
from subscription_manager.certlib import BaseActionInvoker

from subscription_manager.entcertlib import EntCertActionInvoker
from subscription_manager.identitycertlib import IdentityCertActionInvoker
from subscription_manager.healinglib import HealingActionInvoker
Expand All @@ -33,7 +37,7 @@


class ActionClient(base_action_client.BaseActionClient):
def _get_libset(self):
def _get_libset(self) -> List["BaseActionInvoker"]:

# TODO: replace with FSM thats progress through this async and wait/joins if needed
self.entcertlib = EntCertActionInvoker()
Expand All @@ -47,7 +51,7 @@ def _get_libset(self):
# WARNING: order is important here, we need to update a number
# of things before attempting to autoheal, and we need to autoheal
# before attempting to fetch our certificates:
lib_set = [
lib_set: List[BaseActionInvoker] = [
self.entcertlib,
self.idcertlib,
self.content_client,
Expand All @@ -61,47 +65,55 @@ def _get_libset(self):


class HealingActionClient(base_action_client.BaseActionClient):
def _get_libset(self):
def _get_libset(self) -> List["BaseActionInvoker"]:

self.entcertlib = EntCertActionInvoker()
self.installedprodlib = InstalledProductsActionInvoker()
self.syspurposelib = SyspurposeSyncActionInvoker()
self.healinglib = HealingActionInvoker()

lib_set = [self.installedprodlib, self.syspurposelib, self.healinglib, self.entcertlib]
lib_set: List[BaseActionInvoker] = [
self.installedprodlib,
self.syspurposelib,
self.healinglib,
self.entcertlib,
]

return lib_set


# it may make more sense to have *Lib.cleanup actions?
# *Lib things are weird, since some are idempotent, but
# some arent. entcertlib/repolib .update can both install
# some are not. entcertlib/repolib .update can both install
# certs, and/or delete all of them.
class UnregisterActionClient(base_action_client.BaseActionClient):
"""CertManager for cleaning up on unregister.

This class should not need a consumer id, or a uep connection, since it
This class should not need a consumer id nor an UEP connection, since it
is running post unregister.
"""

def _get_libset(self):
def _get_libset(self) -> List["BaseActionInvoker"]:

self.entcertlib = EntCertActionInvoker()
self.content_action_client = ContentActionClient()

lib_set = [self.entcertlib, self.content_action_client]
lib_set: List[BaseActionInvoker] = [
self.entcertlib,
self.content_action_client,
]
return lib_set


class ProfileActionClient(base_action_client.BaseActionClient):
"""
This class should not need a consumer id, or a uep connection, since it
This class should not need a consumer id nor an UEP connection, since it
is running post unregister.
"""

def _get_libset(self):
def _get_libset(self) -> List["BaseActionInvoker"]:

self.profilelib = PackageProfileActionInvoker()

lib_set = [self.profilelib]
lib_set: List[BaseActionInvoker] = [self.profilelib]
return lib_set
53 changes: 28 additions & 25 deletions src/subscription_manager/base_action_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,55 @@
# in this software or its documentation.
#
import logging
from typing import List, TYPE_CHECKING

from rhsm.connection import GoneException, ExpiredIdentityCertException

from subscription_manager import injection as inj

from rhsm.connection import GoneException, ExpiredIdentityCertException
if TYPE_CHECKING:
from subscription_manager.certlib import BaseActionInvoker, ActionReport
from subscription_manager.lock import ActionLock

log = logging.getLogger(__name__)


class BaseActionClient(object):
"""
An object used to update the certficates, yum repos, and facts for the system.
An object used to update the certificates, DNF repos, and facts for the system.
"""

def __init__(self, skips=None):
# FIXME Default for skips should be []
def __init__(self, skips: List[type("ActionReport")] = None):

self._libset = self._get_libset()
self.lock = inj.require(inj.ACTION_LOCK)
self.report = None
self.update_reports = []
self.skips = skips or []
self._libset: List[BaseActionInvoker] = self._get_libset()
self.lock: ActionLock = inj.require(inj.ACTION_LOCK)
# FIXME `report` attribute is not used here
self.report: ActionReport = None
self.update_reports: List[ActionReport] = []
self.skips: List[type(ActionReport)] = skips or []

def _get_libset(self):
def _get_libset(self) -> List["BaseActionInvoker"]:
# FIXME (?) Raise NotImplementedError, to ensure each subclass is using its own function
return []

def update(self, autoheal=False):
def update(self, autoheal: bool = False) -> None:
"""
Update I{entitlement} certificates and corresponding
yum repositiories.
@return: A list of update reports
@rtype: list
Update I{entitlement} certificates and corresponding DNF repositories.
"""
lock = self.lock

# TODO: move to using a lock context manager
try:
lock.acquire()
self.lock.acquire()
self.update_reports = self._run_updates(autoheal)
finally:
lock.release()
self.lock.release()

def _run_update(self, lib):
update_report = None
def _run_update(self, lib: type) -> "ActionReport":
update_report: ActionReport = None

try:
update_report = lib.update()
# see bz#852706, reraise GoneException so that
# consumer cert deletion works
# see bz#852706, reraise GoneException so that consumer cert deletion works
except GoneException:
raise
# raise this so it can be exposed clearly
Expand All @@ -74,16 +76,17 @@ def _run_update(self, lib):

return update_report

def _run_updates(self, autoheal):
# FIXME `autoheal` is not used
def _run_updates(self, autoheal: bool) -> List["ActionReport"]:

update_reports = []
update_reports: List[ActionReport] = []

for lib in self._libset:
if type(lib) in self.skips:
continue

log.debug("running lib: %s" % lib)
update_report = self._run_update(lib)
update_report: ActionReport = self._run_update(lib)

# a map/dict may make more sense here
update_reports.append(update_report)
Expand Down
21 changes: 12 additions & 9 deletions src/subscription_manager/base_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#
from typing import Optional, Set, TYPE_CHECKING

if TYPE_CHECKING:
from subscription_manager.plugins import PluginConfig


class SubManPlugin(object):
Expand All @@ -19,25 +23,24 @@ class SubManPlugin(object):
Plugins need to subclass SubManPlugin() to be found
"""

name = None
conf = None
name: str = None
conf: "PluginConfig" = None
# if all_slots is set, the plugin will get registered to all slots
# it is up to the plugin to handle providing callables
all_slots = None
all_slots: Set[str] = None

# did we have hooks that match provided slots? aka
# is this plugin going to be used
found_slots_for_hooks = False
# did we have hooks that match provided slots? aka is this plugin going to be used
found_slots_for_hooks: bool = False

def __init__(self, conf=None):
def __init__(self, conf: Optional["PluginConfig"] = None):
if conf:
self.conf = conf
if self.conf is None:
raise TypeError("SubManPlugin can not be constructed with conf=None")

def __str__(self):
def __str__(self) -> str:
return self.name or self.__class__.__name__

@classmethod
def get_plugin_key(cls):
def get_plugin_key(cls) -> str:
return ".".join([cls.__module__, cls.__name__])
Loading