Skip to content

Commit

Permalink
Merge pull request #2976 from dcermak/type-hints
Browse files Browse the repository at this point in the history
Type hints & misc smaller fixes
  • Loading branch information
Vogtinator committed Jul 10, 2023
2 parents b1b5536 + 1703348 commit 395857d
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 78 deletions.
43 changes: 31 additions & 12 deletions ReviewBot.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#!/usr/bin/python3

from enum import Enum, unique
import os
import sys
import re
import logging
from typing import List
from typing import Generator, List, Optional, Tuple, Union
import cmdln
from collections import namedtuple
from collections import OrderedDict
Expand Down Expand Up @@ -66,6 +67,16 @@ def _load_lookup_file(self, prj):
return None


@unique
class ReviewChoices(Enum):
NORMAL = 'normal'
NO = 'no'
ACCEPT = 'accept'
ACCEPT_ONPASS = 'accept-onpass'
FALLBACK_ONFAIL = 'fallback-onfail'
FALLBACK_ALWAYS = 'fallback-always'


class ReviewBot(object):
"""
A generic obs request reviewer
Expand All @@ -76,7 +87,10 @@ def check_action_<type>(self, req, action):
"""

DEFAULT_REVIEW_MESSAGES = {'accepted': 'ok', 'declined': 'review failed'}
REVIEW_CHOICES = ('normal', 'no', 'accept', 'accept-onpass', 'fallback-onfail', 'fallback-always')
REVIEW_CHOICES: Tuple[ReviewChoices, ...] = (
ReviewChoices.NORMAL, ReviewChoices.NO, ReviewChoices.ACCEPT,
ReviewChoices.ACCEPT_ONPASS, ReviewChoices.FALLBACK_ONFAIL, ReviewChoices.FALLBACK_ALWAYS
)

COMMENT_MARKER_REGEX = re.compile(r'<!-- (?P<bot>[^ ]+) state=(?P<state>[^ ]+)(?: result=(?P<result>[^ ]+))? -->')

Expand All @@ -98,7 +112,7 @@ def __init__(self, apiurl=None, dryrun=False, logger=None, user=None, group=None
self.review_group = group
self.requests: List[osc.core.Request] = []
self.review_messages = ReviewBot.DEFAULT_REVIEW_MESSAGES
self._review_mode = 'normal'
self._review_mode: ReviewChoices = ReviewChoices.NORMAL
self.fallback_user = None
self.fallback_group = None
self.comment_api = CommentAPI(self.apiurl)
Expand Down Expand Up @@ -151,13 +165,13 @@ def staging_api(self, project):
return self.staging_apis[project]

@property
def review_mode(self):
def review_mode(self) -> ReviewChoices:
return self._review_mode

@review_mode.setter
def review_mode(self, value):
def review_mode(self, value: ReviewChoices):
if value not in self.REVIEW_CHOICES:
raise Exception("invalid review option: %s" % value)
raise ValueError("invalid review option: %s" % value)
self._review_mode = value

def set_request_ids(self, ids):
Expand Down Expand Up @@ -219,7 +233,7 @@ def check_requests(self):
return return_value

@memoize(session=True)
def request_override_check_users(self, project):
def request_override_check_users(self, project: str) -> List[str]:
"""Determine users allowed to override review in a comment command."""
config = Config.get(self.apiurl, project)

Expand All @@ -235,7 +249,7 @@ def request_override_check_users(self, project):

return users

def request_override_check(self, force=False):
def request_override_check(self, force: bool = False) -> Optional[bool]:
"""Check for a comment command requesting review override."""
if not force and not self.override_allow:
return None
Expand All @@ -251,8 +265,8 @@ def request_override_check(self, force=False):
self.review_messages['declined'] = message
return False

def request_commands(self, command, who_allowed=None, request=None, action=None,
include_description=True):
def request_commands(self, command: str, who_allowed=None, request=None, action=None,
include_description=True) -> Generator[Tuple[List[str], Optional[str]], None, None]:
if not request:
request = self.request
if not action:
Expand Down Expand Up @@ -545,7 +559,7 @@ def check_action__default(self, req, a):
self.review_messages['accepted'] += ': ' + message
return self.request_default_return

def check_source_submission(self, src_project, src_package, src_rev, target_project, target_package):
def check_source_submission(self, src_project: str, src_package: str, src_rev: str, target_project: str, target_package: str) -> None:
""" default implemention does nothing """
self.logger.info("%s/%s@%s -> %s/%s" % (src_project, src_package, src_rev, target_project, target_package))
return None
Expand Down Expand Up @@ -812,7 +826,12 @@ def _check_matching_srcmd5(self, project, package, rev, history_limit=5):

return False

def request_age_wait(self, age_min=None, request=None, target_project=None):
def request_age_wait(
self,
age_min: Optional[Union[str, int, float]] = None,
request=None,
target_project: Optional[str] = None
) -> bool:
if not request:
request = self.request

Expand Down
23 changes: 16 additions & 7 deletions check_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import subprocess
import sys
import tempfile
from typing import Optional, Set
from cmdln import CmdlnOptionParser

from lxml import etree as ET

Expand Down Expand Up @@ -42,12 +44,12 @@ def __init__(self, *args, **kwargs):

self.skip_add_reviews = False

def target_project_config(self, project):
def target_project_config(self, project: str) -> None:
# Load project config and allow for remote entries.
config = Config.get(self.apiurl, project)

self.single_action_require = str2bool(config.get('check-source-single-action-require', 'False'))
self.ignore_devel = not str2bool(config.get('devel-project-enforce', 'False'))
self.ignore_devel: bool = not str2bool(config.get('devel-project-enforce', 'False'))
self.in_air_rename_allow = str2bool(config.get('check-source-in-air-rename-allow', 'False'))
self.add_review_team = str2bool(config.get('check-source-add-review-team', 'True'))
self.review_team = config.get('review-team')
Expand All @@ -57,11 +59,11 @@ def target_project_config(self, project):
self.devel_whitelist = config.get('devel-whitelist', '').split()
self.skip_add_reviews = False
self.ensure_source_exist_in_baseproject = str2bool(config.get('check-source-ensure-source-exist-in-baseproject', 'False'))
self.devel_baseproject = config.get('check-source-devel-baseproject', '')
self.devel_baseproject: str = config.get('check-source-devel-baseproject', '')
self.allow_source_in_sle = str2bool(config.get('check-source-allow-source-in-sle', 'True'))
self.sle_project_to_check = config.get('check-source-sle-project', '')
self.allow_valid_source_origin = str2bool(config.get('check-source-allow-valid-source-origin', 'False'))
self.valid_source_origins = set(config.get('check-source-valid-source-origins', '').split(' '))
self.valid_source_origins: Set[str] = set(config.get('check-source-valid-source-origins', '').split(' '))
self.add_devel_project_review = str2bool(config.get('check-source-add-devel-project-review', 'False'))
self.allowed_scm_submission_sources = config.get('allowed-scm-submission-sources', '').split()

Expand All @@ -78,7 +80,7 @@ def target_project_config(self, project):
# It might make sense to supersede maintbot, but for now.
self.skip_add_reviews = True

def is_good_name(self, package, target_package):
def is_good_name(self, package: Optional[str], target_package: Optional[str]) -> bool:
self.logger.debug(f"is_good_name {package} <-> {target_package}")
if target_package is None:
# if the name doesn't matter, existance is all
Expand Down Expand Up @@ -114,7 +116,14 @@ def package_source_parse(self, project, package, revision=None, target_package=N

return ret

def check_source_submission(self, source_project, source_package, source_revision, target_project, target_package):
def check_source_submission(
self,
source_project: str,
source_package: str,
source_revision: str,
target_project: str,
target_package: str
) -> bool:
super(CheckSource, self).check_source_submission(source_project,
source_package, source_revision, target_project, target_package)
self.target_project_config(target_project)
Expand Down Expand Up @@ -762,7 +771,7 @@ def __init__(self, *args, **kwargs):
ReviewBot.CommandLineInterface.__init__(self, args, kwargs)
self.clazz = CheckSource

def get_optparser(self):
def get_optparser(self) -> CmdlnOptionParser:
parser = ReviewBot.CommandLineInterface.get_optparser(self)

parser.add_option('--skip-add-reviews', action='store_true', default=False,
Expand Down
28 changes: 23 additions & 5 deletions origin-manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/python3

from typing import Literal, Optional, Tuple, Union
from osclib.core import devel_project_get
from osclib.core import package_source_hash
from osclib.core import package_kind
Expand Down Expand Up @@ -77,7 +78,7 @@ def check_action_delete_package(self, request, action):

return True

def check_source_submission(self, src_project, src_package, src_rev, tgt_project, tgt_package):
def check_source_submission(self, src_project, src_package, src_rev, tgt_project, tgt_package) -> Optional[bool]:
kind = package_kind(self.apiurl, tgt_project, tgt_package)
if not (kind is None or kind == 'source'):
self.review_messages['accepted'] = 'skipping {} package since not source'.format(kind)
Expand Down Expand Up @@ -118,7 +119,16 @@ def check_source_submission(self, src_project, src_package, src_rev, tgt_project
source_hash_new, source_hash_old)
return self.policy_result_handle(tgt_project, tgt_package, origin_info_new, origin_info_old, result)

def config_validate(self, target_project):
def config_validate(self, target_project: str) -> Tuple[bool, bool]:
"""Check the settings ``OSRT:OriginConfig`` of the target project and
return a tuple of booleans. The first boolean indicates whether the
action should proceed and the second whether the config is valid.
This function checks whether the value
``OSRT:OriginConfig.fallback-group`` is present and whether
``OSRT:OriginConfig.review-user`` matches the configured review_user.
"""
config = config_load(self.apiurl, target_project)
if not config:
# No perfect solution for lack of a config. For normal projects a
Expand All @@ -138,7 +148,11 @@ def config_validate(self, target_project):

return True, True

def devel_project_simulate_check(self, source_project, target_project):
def devel_project_simulate_check(
self,
source_project: str,
target_project: str
) -> Union[Tuple[str, str], Tuple[Literal[False], Literal[None]]]:
config = config_load(self.apiurl, target_project)
origin_list = config_origin_list(config, self.apiurl, target_project, skip_workarounds=True)

Expand All @@ -157,7 +171,11 @@ def devel_project_simulate_check(self, source_project, target_project):

return False, None

def devel_project_simulate_check_command(self, source_project, target_project):
def devel_project_simulate_check_command(
self,
source_project: str,
target_project: str
) -> Union[Tuple[str, Optional[str]], Tuple[Literal[False], Literal[None]]]:
who_allowed = self.request_override_check_users(target_project)
if self.request.creator not in who_allowed:
who_allowed.append(self.request.creator)
Expand All @@ -168,7 +186,7 @@ def devel_project_simulate_check_command(self, source_project, target_project):

return False, None

def policy_result_handle(self, project, package, origin_info_new, origin_info_old, result):
def policy_result_handle(self, project, package, origin_info_new, origin_info_old, result: PolicyResult) -> Optional[bool]:
if result.wait and not result.accept:
result.comments.append(f'Decision may be overridden via `@{self.review_user} override`.')

Expand Down
56 changes: 41 additions & 15 deletions osclib/comments.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,49 @@
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Literal, Optional, Tuple, TypedDict, Union
from dateutil.parser import parse as date_parse
import re
from lxml import etree as ET

from osc.core import http_DELETE
from osc.core import http_GET
from osc.core import http_POST
if TYPE_CHECKING:
import xml.etree.ElementTree as ET
else:
from lxml import etree as ET

from osc.connection import http_DELETE
from osc.connection import http_GET
from osc.connection import http_POST
from osc.core import makeurl


class _BaseComment(TypedDict):
who: Optional[str]
when: datetime
parent: Optional[Any]
comment: Optional[str]


class Comment(_BaseComment):
id: Optional[str]


class RequestAsComment(_BaseComment):
id: Literal['-1']


class CommentAPI(object):
COMMENT_MARKER_REGEX = re.compile(r'<!-- (?P<bot>[^ ]+)(?P<info>(?: [^= ]+=[^ ]+)*) -->')

def __init__(self, apiurl):
self.apiurl = apiurl

def _prepare_url(self, request_id=None, project_name=None,
package_name=None, query=None):
def _prepare_url(self, request_id=None, project_name: Optional[str] = None,
package_name: Optional[str] = None,
query: Optional[Union[List[str], Dict[str, str]]] = None
) -> str:
"""Prepare the URL to get/put comments in OBS.
:param request_id: Request where to refer the comment.
:param project_name: Project name where to refer comment.
:param package_name: Package name where to refer the comment.
:returns: Formated URL for the request.
:returns: Formatted URL for the request.
"""
url = None
if request_id:
Expand All @@ -36,21 +57,20 @@ def _prepare_url(self, request_id=None, project_name=None,
raise ValueError('Please, set request_id, project_name or / and package_name to add a comment.')
return url

def _comment_as_dict(self, comment_element):
def _comment_as_dict(self, comment_element: ET.Element) -> Comment:
"""Convert an XML element comment into a dictionary.
:param comment_element: XML element that store a comment.
:returns: A Python dictionary object.
"""
comment = {
return {
'who': comment_element.get('who'),
'when': datetime.strptime(comment_element.get('when'), '%Y-%m-%d %H:%M:%S %Z'),
'when': datetime.strptime(comment_element.get('when', ''), '%Y-%m-%d %H:%M:%S %Z'),
'id': comment_element.get('id'),
'parent': comment_element.get('parent', None),
'comment': comment_element.text,
}
return comment

def request_as_comment_dict(self, request):
def request_as_comment_dict(self, request) -> RequestAsComment:
return {
'who': request.creator,
'when': date_parse(request.statehistory[0].when),
Expand All @@ -60,7 +80,7 @@ def request_as_comment_dict(self, request):
}

def get_comments(self, request_id=None, project_name=None,
package_name=None):
package_name=None) -> Dict[str, Comment]:
"""Get the list of comments of an object in OBS.
:param request_id: Request where to get comments.
Expand Down Expand Up @@ -106,7 +126,13 @@ def comment_find(self, comments, bot, info_match=None):
return c, info
return None, None

def command_find(self, comments, user, command=None, who_allowed=None):
def command_find(
self,
comments: Dict[str, Comment],
user: str,
command: Optional[str] = None,
who_allowed=None
) -> Generator[Tuple[List[str], Optional[str]], None, None]:
"""
Find comment commands with the optional conditions.
Expand Down
Loading

0 comments on commit 395857d

Please sign in to comment.