diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 85eb94e6cd9..3a68be4216f 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -29,6 +29,7 @@ jobs: matrix: mypy_packages: - "nav2_smac_planner" + - "nav2_common" steps: - uses: actions/checkout@v4 diff --git a/nav2_common/nav2_common/launch/has_node_params.py b/nav2_common/nav2_common/launch/has_node_params.py index 6e386128ce0..fa36f280785 100644 --- a/nav2_common/nav2_common/launch/has_node_params.py +++ b/nav2_common/nav2_common/launch/has_node_params.py @@ -12,13 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Text - import launch import yaml -class HasNodeParams(launch.Substitution): +class HasNodeParams(launch.Substitution): # type: ignore[misc] """ Substitution that checks if a param file contains parameters for a node. @@ -26,7 +24,7 @@ class HasNodeParams(launch.Substitution): """ def __init__( - self, source_file: launch.SomeSubstitutionsType, node_name: Text + self, source_file: launch.SomeSubstitutionsType, node_name: str ) -> None: super().__init__() """ @@ -39,21 +37,22 @@ def __init__( # import here to avoid loop from launch.utilities import normalize_to_list_of_substitutions - self.__source_file = normalize_to_list_of_substitutions(source_file) + self.__source_file: list[launch.Substitution] = \ + normalize_to_list_of_substitutions(source_file) self.__node_name = node_name @property - def name(self) -> List[launch.Substitution]: + def name(self) -> list[launch.Substitution]: """Getter for name.""" return self.__source_file - def describe(self) -> Text: + def describe(self) -> str: """Return a description of this substitution as a string.""" return '' - def perform(self, context: launch.LaunchContext) -> Text: + def perform(self, context: launch.LaunchContext) -> str: yaml_filename = launch.utilities.perform_substitutions(context, self.name) - data = yaml.safe_load(open(yaml_filename, 'r')) + data = yaml.safe_load(open(yaml_filename)) if self.__node_name in data.keys(): return 'True' diff --git a/nav2_common/nav2_common/launch/replace_string.py b/nav2_common/nav2_common/launch/replace_string.py index cb55b883380..e379c045457 100644 --- a/nav2_common/nav2_common/launch/replace_string.py +++ b/nav2_common/nav2_common/launch/replace_string.py @@ -13,12 +13,12 @@ # limitations under the License. import tempfile -from typing import Dict, List, Optional, Text +from typing import Optional import launch -class ReplaceString(launch.Substitution): +class ReplaceString(launch.Substitution): # type: ignore[misc] """ Substitution that replaces strings on a given file. @@ -28,7 +28,7 @@ class ReplaceString(launch.Substitution): def __init__( self, source_file: launch.SomeSubstitutionsType, - replacements: Dict, + replacements: dict[str, launch.SomeSubstitutionsType], condition: Optional[launch.Condition] = None, ) -> None: super().__init__() @@ -37,7 +37,8 @@ def __init__( # import here to avoid loop - self.__source_file = normalize_to_list_of_substitutions(source_file) + self.__source_file: list[launch.Substitution] = \ + normalize_to_list_of_substitutions(source_file) self.__replacements = {} for key in replacements: self.__replacements[key] = normalize_to_list_of_substitutions( @@ -46,7 +47,7 @@ def __init__( self.__condition = condition @property - def name(self) -> List[launch.Substitution]: + def name(self) -> list[launch.Substitution]: """Getter for name.""" return self.__source_file @@ -55,17 +56,19 @@ def condition(self) -> Optional[launch.Condition]: """Getter for condition.""" return self.__condition - def describe(self) -> Text: + def describe(self) -> str: """Return a description of this substitution as a string.""" return '' - def perform(self, context: launch.LaunchContext) -> Text: - yaml_filename = launch.utilities.perform_substitutions(context, self.name) + def perform(self, context: launch.LaunchContext) -> str: + yaml_filename: str = launch.utilities.perform_substitutions( + context, self.name + ) if self.__condition is None or self.__condition.evaluate(context): output_file = tempfile.NamedTemporaryFile(mode='w', delete=False) replacements = self.resolve_replacements(context) try: - input_file = open(yaml_filename, 'r') + input_file = open(yaml_filename) self.replace(input_file, output_file, replacements) except Exception as err: # noqa: B902 print('ReplaceString substitution error: ', err) @@ -76,7 +79,7 @@ def perform(self, context: launch.LaunchContext) -> Text: else: return yaml_filename - def resolve_replacements(self, context): + def resolve_replacements(self, context: launch.LaunchContext) -> dict[str, str]: resolved_replacements = {} for key in self.__replacements: resolved_replacements[key] = launch.utilities.perform_substitutions( @@ -84,7 +87,8 @@ def resolve_replacements(self, context): ) return resolved_replacements - def replace(self, input_file, output_file, replacements): + def replace(self, input_file: launch.SomeSubstitutionsType, + output_file: launch.SomeSubstitutionsType, replacements: dict[str, str]) -> None: for line in input_file: for key, value in replacements.items(): if isinstance(key, str) and isinstance(value, str): diff --git a/nav2_common/nav2_common/launch/rewritten_yaml.py b/nav2_common/nav2_common/launch/rewritten_yaml.py index 5434f611f51..49ec39b5377 100644 --- a/nav2_common/nav2_common/launch/rewritten_yaml.py +++ b/nav2_common/nav2_common/launch/rewritten_yaml.py @@ -12,27 +12,30 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections.abc import Generator import tempfile -from typing import Dict, List, Optional, Text +from typing import Optional, TypeAlias, Union import launch import yaml +YamlValue: TypeAlias = Union[str, int, float, bool] + class DictItemReference: - def __init__(self, dictionary, key): + def __init__(self, dictionary: dict[str, YamlValue], key: str): self.dictionary = dictionary self.dictKey = key - def key(self): + def key(self) -> str: return self.dictKey - def setValue(self, value): + def setValue(self, value: YamlValue) -> None: self.dictionary[self.dictKey] = value -class RewrittenYaml(launch.Substitution): +class RewrittenYaml(launch.Substitution): # type: ignore[misc] """ Substitution that modifies the given YAML file. @@ -42,10 +45,10 @@ class RewrittenYaml(launch.Substitution): def __init__( self, source_file: launch.SomeSubstitutionsType, - param_rewrites: Dict, + param_rewrites: dict[str, launch.SomeSubstitutionsType], root_key: Optional[launch.SomeSubstitutionsType] = None, - key_rewrites: Optional[Dict] = None, - convert_types=False, + key_rewrites: Optional[dict[str, launch.SomeSubstitutionsType]] = None, + convert_types: bool = False, ) -> None: super().__init__() """ @@ -61,7 +64,8 @@ def __init__( # import here to avoid loop from launch.utilities import normalize_to_list_of_substitutions - self.__source_file = normalize_to_list_of_substitutions(source_file) + self.__source_file: list[launch.Substitution] = \ + normalize_to_list_of_substitutions(source_file) self.__param_rewrites = {} self.__key_rewrites = {} self.__convert_types = convert_types @@ -79,19 +83,19 @@ def __init__( self.__root_key = normalize_to_list_of_substitutions(root_key) @property - def name(self) -> List[launch.Substitution]: + def name(self) -> list[launch.Substitution]: """Getter for name.""" return self.__source_file - def describe(self) -> Text: + def describe(self) -> str: """Return a description of this substitution as a string.""" return '' - def perform(self, context: launch.LaunchContext) -> Text: + def perform(self, context: launch.LaunchContext) -> str: yaml_filename = launch.utilities.perform_substitutions(context, self.name) rewritten_yaml = tempfile.NamedTemporaryFile(mode='w', delete=False) param_rewrites, keys_rewrites = self.resolve_rewrites(context) - data = yaml.safe_load(open(yaml_filename, 'r')) + data = yaml.safe_load(open(yaml_filename)) self.substitute_params(data, param_rewrites) self.add_params(data, param_rewrites) self.substitute_keys(data, keys_rewrites) @@ -103,7 +107,8 @@ def perform(self, context: launch.LaunchContext) -> Text: rewritten_yaml.close() return rewritten_yaml.name - def resolve_rewrites(self, context): + def resolve_rewrites(self, context: launch.LaunchContext) -> \ + tuple[dict[str, str], dict[str, str]]: resolved_params = {} for key in self.__param_rewrites: resolved_params[key] = launch.utilities.perform_substitutions( @@ -116,7 +121,8 @@ def resolve_rewrites(self, context): ) return resolved_params, resolved_keys - def substitute_params(self, yaml, param_rewrites): + def substitute_params(self, yaml: dict[str, YamlValue], + param_rewrites: dict[str, str]) -> None: # substitute leaf-only parameters for key in self.getYamlLeafKeys(yaml): if key.key() in param_rewrites: @@ -132,7 +138,8 @@ def substitute_params(self, yaml, param_rewrites): yaml_keys = path.split('.') yaml = self.updateYamlPathVals(yaml, yaml_keys, rewrite_val) - def add_params(self, yaml, param_rewrites): + def add_params(self, yaml: dict[str, YamlValue], + param_rewrites: dict[str, str]) -> None: # add new total path parameters yaml_paths = self.pathify(yaml) for path in param_rewrites: @@ -142,7 +149,10 @@ def add_params(self, yaml, param_rewrites): if 'ros__parameters' in yaml_keys: yaml = self.updateYamlPathVals(yaml, yaml_keys, new_val) - def updateYamlPathVals(self, yaml, yaml_key_list, rewrite_val): + def updateYamlPathVals( + self, yaml: dict[str, YamlValue], + yaml_key_list: list[str], rewrite_val: YamlValue) -> dict[str, YamlValue]: + for key in yaml_key_list: if key == yaml_key_list[-1]: yaml[key] = rewrite_val @@ -153,12 +163,15 @@ def updateYamlPathVals(self, yaml, yaml_key_list, rewrite_val): yaml[int(key)], yaml_key_list, rewrite_val ) else: - yaml[key] = self.updateYamlPathVals( - yaml.get(key, {}), yaml_key_list, rewrite_val + yaml[key] = self.updateYamlPathVals( # type: ignore[assignment] + yaml.get(key, {}), # type: ignore[arg-type] + yaml_key_list, + rewrite_val ) return yaml - def substitute_keys(self, yaml, key_rewrites): + def substitute_keys( + self, yaml: dict[str, YamlValue], key_rewrites: dict[str, str]) -> None: if len(key_rewrites) != 0: for key in list(yaml.keys()): val = yaml[key] @@ -169,20 +182,31 @@ def substitute_keys(self, yaml, key_rewrites): if isinstance(val, dict): self.substitute_keys(val, key_rewrites) - def getYamlLeafKeys(self, yamlData): - try: - for key in yamlData.keys(): - for k in self.getYamlLeafKeys(yamlData[key]): - yield k - yield DictItemReference(yamlData, key) - except AttributeError: + def getYamlLeafKeys(self, yamlData: dict[str, YamlValue]) -> \ + Generator[DictItemReference, None, None]: + if not isinstance(yamlData, dict): return - def pathify(self, d, p=None, paths=None, joinchar='.'): + for key in yamlData.keys(): + child = yamlData[key] + + if isinstance(child, dict): + # Recursively process nested dictionaries + yield from self.getYamlLeafKeys(child) + + yield DictItemReference(yamlData, key) + + def pathify( + self, d: Union[dict[str, YamlValue], list[YamlValue], YamlValue], + p: Optional[str] = None, + paths: Optional[dict[str, YamlValue]] = None, + joinchar: str = '.') -> dict[str, YamlValue]: if p is None: paths = {} self.pathify(d, '', paths, joinchar=joinchar) return paths + + assert paths is not None pn = p if p != '': pn += joinchar @@ -195,8 +219,9 @@ def pathify(self, d, p=None, paths=None, joinchar='.'): self.pathify(e, pn + str(idx), paths, joinchar=joinchar) else: paths[p] = d + return paths - def convert(self, text_value): + def convert(self, text_value: str) -> YamlValue: if self.__convert_types: # try converting to int or float try: diff --git a/tools/pyproject.toml b/tools/pyproject.toml index 663c8243821..ab81e85d120 100644 --- a/tools/pyproject.toml +++ b/tools/pyproject.toml @@ -18,5 +18,10 @@ explicit_package_bases = true strict = true [[tool.mypy.overrides]] -module = ["matplotlib.*", "rtree.*"] +module = [ + "matplotlib.*", + "rtree.*", + "launch.*", +] + ignore_missing_imports = true