diff --git a/launch/launch/actions/__init__.py b/launch/launch/actions/__init__.py index ae0577e33..86d0bd9dd 100644 --- a/launch/launch/actions/__init__.py +++ b/launch/launch/actions/__init__.py @@ -19,6 +19,8 @@ from .emit_event import EmitEvent from .execute_local import ExecuteLocal from .execute_process import ExecuteProcess +from .for_loop import ForEach +from .for_loop import ForLoop from .group_action import GroupAction from .include_launch_description import IncludeLaunchDescription from .log_info import LogInfo @@ -45,6 +47,8 @@ 'EmitEvent', 'ExecuteLocal', 'ExecuteProcess', + 'ForEach', + 'ForLoop', 'GroupAction', 'IncludeLaunchDescription', 'LogInfo', diff --git a/launch/launch/actions/for_loop.py b/launch/launch/actions/for_loop.py new file mode 100644 index 000000000..ff33840df --- /dev/null +++ b/launch/launch/actions/for_loop.py @@ -0,0 +1,395 @@ +# Copyright 2025 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module for the ForLoop action.""" + +from copy import deepcopy +from typing import Any +from typing import Callable +from typing import List +from typing import Mapping +from typing import Optional +from typing import Protocol +from typing import Text + +# yaml has type annotations in typeshed, but those cannot be installed via rosdep +# since there is no definition for types-PyYAML +import yaml # type: ignore + +from ..action import Action +from ..actions.opaque_function import OpaqueFunction +from ..frontend import Entity +from ..frontend import expose_action +from ..frontend import Parser +from ..launch_context import LaunchContext +from ..launch_description_entity import LaunchDescriptionEntity +from ..logging import get_logger +from ..some_substitutions_type import SomeSubstitutionsType +from ..substitution import Substitution +from ..substitutions import ForEachVar +from ..utilities import perform_substitutions + + +@expose_action('for_each') +class ForEach(Action): + """ + Action that iterates through sets of input values and uses them to instantiate entities. + + Sets of input values are provided as semicolon-separated string YAML structures, which could be + a substitution, such as a :class:`launch.substitutions.LaunchConfiguration`. Each iteration + gets a set of input values it can use to instantiate entities. For example, the values can be + used to create the same node but under different namespaces. The number of iterations is + defined by the number of semicolon-separated sets of values. An empty string results in no + iterations, while an empty YAML structure (e.g., `'{}'`) results in an iteration with no input + values. + + When using this action directly through Python, for each iteration, the provided callback + function is called with one set of values, and should return a list of entities. The names of + the callback function parameters must match the keys in the YAML structure, and the expected + types of the parameters correspond to the types of the values in the YAML structure (YAML type + rules apply). The order of the callback function parameters does not matter. The callback + function could also use `**kwargs`. Finally, default values can be defined through default + values of callback function parameters, in which case they may be omitted from a set of values + in the YAML string. + + Simple example: + + .. code-block:: python + + def for_each(id: int, name: str): + return [ + LogInfo(msg=f"robot '{name}' id={id}"), + ] + + def generate_launch_description(): + return LaunchDescription([ + DeclareLaunchArgument( + 'robots', default_value="{name: 'robotA', id: 1};{name: 'robotB', id: 2}"), + ForEach(LaunchConfiguration('robots'), function=for_each), + ]) + + When using this action through a frontend, provide entities to be instantiated for each loop + iteration as child entities. Use a `$(for-var)` substitution (:class:`ForEachVar`) with the + name of the for-each variable, e.g., `$(for-var name)`. A default value can be provided for the + variable if it is not available for a given iteration, e.g., `$(for-var name default)`. + + Simple examples: + + .. code-block:: xml + + + + + + + + + .. code-block:: yaml + + launch: + - arg: + name: robots + default: "{name: 'robotA', id: 1};{name: 'robotB', id: 2}" + - for_each: + iter: $(var robots) + children: + - log: + message: "'$(for-var name)' id=$(for-var id)" + + The above examples would ouput the following log messages by default: + + .. code-block:: text + + 'robotA' id=1 + 'robotB' id=2 + + If the 'robots' launch argument was set to a different value: + + .. code-block:: console + + robots:="{name: 'robotC', id: 3};{name: 'robotD', id: 4};{name: 'robotE', id: 5}" + + Then it would output: + + .. code-block:: text + + 'robotC' id=3 + 'robotD' id=4 + 'robotE' id=5 + """ + + SEPARATOR = ';' + + def __init__( + self, + input_values: SomeSubstitutionsType, + *, + function: Callable[..., Optional[List[LaunchDescriptionEntity]]], + **kwargs, + ) -> None: + """ + Create a ForEach. + + :param input_values: the sets of inputs values to iterate over, provided as a + semicolon-separated list of string YAML structures (e.g., flow style YAML strings + separated by semicolons) + :param function: a function that receives values from each YAML structure and returns + entities + """ + super().__init__(**kwargs) + + from ..utilities import normalize_to_list_of_substitutions # import here to avoid loop + self._input_values = normalize_to_list_of_substitutions(input_values) + self._function = function + self._logger = get_logger(__name__) + + @property + def input_values(self) -> List[Substitution]: + return self._input_values + + @property + def function(self) -> Callable[[int], Optional[List[LaunchDescriptionEntity]]]: + return self._function + + def describe(self) -> Text: + return ( + self.__class__.__name__ + + f"(input_values='{self._input_values}', function={self._function})" + ) + + @classmethod + def parse(cls, entity: Entity, parser: Parser): + """Return `ForEach` action and kwargs for constructing it.""" + _, kwargs = super().parse(entity, parser) + input_values = entity.get_attr('values') + if input_values is not None: + kwargs['input_values'] = parser.parse_substitution(input_values) + parsed_children = [parser.parse_action(e) for e in entity.children] + + def for_each(**iteration_vars) -> List[LaunchDescriptionEntity]: + return cls._get_iteration_entities(parsed_children, iteration_vars) + kwargs['function'] = for_each + return cls, kwargs + + def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEntity]]: + # Get the for-each input values + input_values = perform_substitutions(context, self._input_values) + self._logger.debug(f'input_values={input_values}') + # Split into list of dicts + input_values_list = list(filter(None, input_values.strip().split(self.SEPARATOR))) + iteration_dicts = [yaml.safe_load(i.strip()) for i in input_values_list] + if not iteration_dicts: + self._logger.warning('no input values: will not iterate') + + entities = [] + for iteration_dict in iteration_dicts: + self._logger.debug(f'iteration: {iteration_dict}') + # Do still call the function if the YAML structure is empty + if iteration_dict is not None: + i_entities = self._function(**iteration_dict) + if i_entities: + entities.extend(i_entities) + return entities + + @classmethod + def _get_iteration_entities( + cls, + children: List[Action], + iteration_vars: Mapping[str, Any], + ) -> List[LaunchDescriptionEntity]: + return [ + # Push and pop locals to avoid having the local variables leak + OpaqueFunction(function=cls._push_locals), + # Set local variables so that they can be used as iteration-specific values by the + # child entities through substitutions + OpaqueFunction(function=cls._set_args_local, args=(iteration_vars,)), + # Include a deep copy of child entities + *deepcopy(children), + OpaqueFunction(function=ForEach._pop_locals), + ] + + @classmethod + def _push_locals( + cls, + context: LaunchContext, + ) -> Optional[List[LaunchDescriptionEntity]]: + context._push_locals() + return None + + @classmethod + def _pop_locals( + cls, + context: LaunchContext, + ) -> Optional[List[LaunchDescriptionEntity]]: + context._pop_locals() + return None + + @classmethod + def _set_args_local( + cls, + context: LaunchContext, + args: Mapping[str, Any], + ) -> Optional[List[LaunchDescriptionEntity]]: + context.extend_locals( + {ForEachVar.get_local_arg_name(name): str(value) for name, value in args.items()}) + return None + + +@expose_action('for') +class ForLoop(Action): + """ + Action that instantiates entities through a function N times, e.g., based on a launch argument. + + The number of iterations of the for-loop is defined by the provided length, which could be a + substitution, such as a :class:`launch.substitutions.LaunchConfiguration`. + + When using this action directly through Python, for each loop iteration, the provided callback + function is called with the index value, going from 0 to N (exclusive), and should return a + list of entities. The callback function must have one parameter: `i` of type `int`. + + Simple example: + + .. code-block:: python + + def for_i(i: int): + return [ + LogInfo(msg=['i=', str(i)]), + ] + + def generate_launch_description(): + return LaunchDescription([ + DeclareLaunchArgument('num', default_value='2'), + ForLoop(LaunchConfiguration('num'), function=for_i), + ]) + + When using this action through a frontend, provide entities to be instantiated for each loop + iteration as child entities. Use an `$(index)` substitution + (:class:`launch.substitutions.ForLoopIndex`) with the index name of the for-loop. + + Simple examples: + + .. code-block:: xml + + + + + + + + + .. code-block:: yaml + + launch: + - arg: + name: num + default: '2' + - for: + len: $(var num) + name: i + children: + - log: + message: i=$(index i) + + The above examples would ouput the following log messages by default: + + .. code-block:: text + + i=0 + i=1 + + If the 'num' launch argument was set to 5 (num:=5), then it would output: + + .. code-block:: text + + i=0 + i=1 + i=2 + i=3 + i=4 + """ + + class _CallbackFunction(Protocol): + + def __call__(self, i: int) -> Optional[List[LaunchDescriptionEntity]]: ... + + def __init__( + self, + length: SomeSubstitutionsType, + *, + function: _CallbackFunction, + name: Optional[str] = None, + **kwargs, + ) -> None: + """ + Create a ForLoop. + + :param length: the length of the for-loop; must be convertible to `int` through `int()`, + otherwise a `ValueError` will be raised during execution + :param function: a function that receives an integer loop index value (`i`) and returns + entities + :param name: the for-loop name, used as the index name with the ForLoopIndex substitution + """ + super().__init__(**kwargs) + + from ..utilities import normalize_to_list_of_substitutions # import here to avoid loop + self._length = normalize_to_list_of_substitutions(length) + self._function = function + self._name = name + self._logger = get_logger(__name__) + + @property + def length(self) -> List[Substitution]: + return self._length + + @property + def function(self) -> Callable[[int], Optional[List[LaunchDescriptionEntity]]]: + return self._function + + @property + def name(self) -> Optional[str]: + return self._name + + def describe(self) -> Text: + return ( + self.__class__.__name__ + + f"(length='{self._length}', name='{self._name}', function={self._function})" + ) + + @classmethod + def parse(cls, entity: Entity, parser: Parser): + """Return `ForLoop` action and kwargs for constructing it.""" + _, kwargs = super().parse(entity, parser) + length = entity.get_attr('len') + if length is not None: + kwargs['length'] = parser.parse_substitution(length) + name = entity.get_attr('name') + kwargs['name'] = name + parsed_children = [parser.parse_action(e) for e in entity.children] + + def for_i(i: int) -> List[LaunchDescriptionEntity]: + return ForEach._get_iteration_entities(parsed_children, {name: i}) + kwargs['function'] = for_i + return cls, kwargs + + def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEntity]]: + # Get the for-loop length and convert to int + length = int(perform_substitutions(context, self._length)) + self._logger.debug(f'for-loop length={length}') + # Create list of YAML (dict) strings + input_values = ForEach.SEPARATOR.join( + yaml.dump({'i': i}, default_flow_style=True).strip() + for i in range(length) + ) + self._logger.debug(f'input_values={input_values}') + return [ForEach(input_values, function=self._function)] diff --git a/launch/launch/substitutions/__init__.py b/launch/launch/substitutions/__init__.py index 1622debaa..825e5d33a 100644 --- a/launch/launch/substitutions/__init__.py +++ b/launch/launch/substitutions/__init__.py @@ -25,6 +25,8 @@ from .equals_substitution import EqualsSubstitution from .file_content import FileContent from .find_executable import FindExecutable +from .for_loop_var import ForEachVar +from .for_loop_var import ForLoopIndex from .if_else_substitution import IfElseSubstitution from .launch_configuration import LaunchConfiguration from .launch_log_dir import LaunchLogDir @@ -47,6 +49,8 @@ 'EnvironmentVariable', 'FileContent', 'FindExecutable', + 'ForEachVar', + 'ForLoopIndex', 'IfElseSubstitution', 'LaunchConfiguration', 'LaunchLogDir', diff --git a/launch/launch/substitutions/for_loop_var.py b/launch/launch/substitutions/for_loop_var.py new file mode 100644 index 000000000..89c513eae --- /dev/null +++ b/launch/launch/substitutions/for_loop_var.py @@ -0,0 +1,123 @@ +# Copyright 2025 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module for the ForLoopIndex substitution.""" + +from typing import List +from typing import Optional +from typing import Sequence +from typing import Text + +from ..frontend import expose_substitution +from ..launch_context import LaunchContext +from ..logging import get_logger +from ..some_substitutions_type import SomeSubstitutionsType +from ..substitution import Substitution +from ..utilities import perform_substitutions + + +@expose_substitution('for-var') +class ForEachVar(Substitution): + """Substitution for a :class:`launch.actions.ForEach` iteration variable value.""" + + def __init__( + self, + name: SomeSubstitutionsType, + *, + default_value: Optional[SomeSubstitutionsType] = None, + ) -> None: + """ + Create a ForEachVar. + + :param name: the name of the :class:`launch.actions.ForEach` iteration variable + :param default_value: a default value for the variable if a value is not available for a + given iteration + """ + super().__init__() + + from ..utilities import normalize_to_list_of_substitutions # import here to avoid loop + self._name = normalize_to_list_of_substitutions(name) + self._default_value = ( + normalize_to_list_of_substitutions(default_value) + if default_value is not None + else None + ) + self._logger = get_logger(__name__) + + @property + def name(self) -> List[Substitution]: + return self._name + + @property + def default_value(self) -> Optional[List[Substitution]]: + return self._default_value + + def describe(self) -> Text: + return ( + self.__class__.__name__ + + '(' + + f"name={' + '.join([sub.describe() for sub in self._name])}" + + ', ' + + f"default_value={' + '.join([sub.describe() for sub in self._default_value or []])}" + + ')' + ) + + @classmethod + def parse(cls, data: Sequence[SomeSubstitutionsType]): + if not any(len(data) == length for length in (1, 2)): + raise ValueError(f'{cls.__name__} substitution expects 1 or 2 arguments') + kwargs = {} + kwargs['name'] = data[0] + if len(data) == 2: + kwargs['default_value'] = data[1] + return cls, kwargs + + def perform(self, context: LaunchContext) -> Text: + name = perform_substitutions(context, self._name) + self._logger.debug(f'name={name}') + local_arg_name = self.get_local_arg_name(name) + if not hasattr(context.locals, local_arg_name): + if self._default_value is None: + raise RuntimeError( + f'No value available for {self.__class__.__name__} ' + f"'{name}' and no default value provided" + ) + value = perform_substitutions(context, self._default_value) + else: + value = getattr(context.locals, local_arg_name) + self._logger.debug(f'{name}={value}') + return value + + @classmethod + def get_local_arg_name(cls, name: str) -> str: + # Prevent local variable collisions + return f'ForEachVar__{name}' + + +@expose_substitution('index') +class ForLoopIndex(ForEachVar): + """Substitution for a :class:`launch.actions.ForLoop` iteration index value.""" + + def __init__( + self, + name: SomeSubstitutionsType, + **kwargs, + ) -> None: + """ + Create a ForLoopIndex. + + :param name: the name of the :class:`launch.actions.ForLoop` index which this substitution + is part of + """ + super().__init__(name) diff --git a/launch/test/launch/actions/test_for_loop.py b/launch/test/launch/actions/test_for_loop.py new file mode 100644 index 000000000..e5092a2a4 --- /dev/null +++ b/launch/test/launch/actions/test_for_loop.py @@ -0,0 +1,375 @@ +# Copyright 2025 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the ForEach and ForLoop actions.""" + +from typing import Any +from typing import Callable +from typing import List +from typing import Mapping +from typing import Optional + +from launch import Action +from launch import LaunchContext +from launch import LaunchDescriptionEntity +from launch.actions import DeclareLaunchArgument +from launch.actions import ForEach +from launch.actions import ForLoop +from launch.substitutions import LaunchConfiguration +from launch.substitutions import PythonExpression +from launch.substitutions import TextSubstitution +import pytest + + +def for_each( + returned_entities: List[LaunchDescriptionEntity], + args_collector: Optional[List[Mapping[str, Any]]] = None, +) -> Callable[..., Optional[List[LaunchDescriptionEntity]]]: + def f(**kwargs) -> List[LaunchDescriptionEntity]: + if args_collector is not None: + args_collector.append(kwargs) + return returned_entities + return f + + +def test_for_each_constructors(): + """Test the constructors for the ForEach class.""" + f = for_each([]) + action = ForEach('{};{}', function=f) + assert len(action.input_values) == 1 + assert isinstance(action.input_values[0], TextSubstitution) + assert action.input_values[0].text == '{};{}' + assert action.function == f + assert action.describe().startswith('ForEach') + + action = ForEach(TextSubstitution(text='{}'), function=f) + assert len(action.input_values) == 1 + assert isinstance(action.input_values[0], TextSubstitution) + assert action.input_values[0].text == '{}' + assert action.function == f + + ForEach(LaunchConfiguration('config'), function=f) + assert action.function == f + + +def test_for_each_execute(): + """Test the execute() of the ForEach class.""" + context = LaunchContext() + iter_values = [] + + # Should not iterate when input is empty + result = ForEach('', function=for_each([], iter_values)).visit(context) + assert len(result) == 0 + assert iter_values == [] + iter_values.clear() + + result = ForEach('', function=for_each(None, iter_values)).visit(context) + assert len(result) == 0 + assert iter_values == [] + iter_values.clear() + + result = ForEach('', function=for_each([Action()], iter_values)).visit(context) + assert len(result) == 0 + assert iter_values == [] + iter_values.clear() + + result = ForEach(';', function=for_each([Action()], iter_values)).visit(context) + assert len(result) == 0 + assert iter_values == [] + iter_values.clear() + + # Should still iterate with no args + result = ForEach('{}', function=for_each([], iter_values)).visit(context) + assert len(result) == 0 + assert iter_values == [{}] + iter_values.clear() + + result = ForEach('{}', function=for_each(None, iter_values)).visit(context) + assert len(result) == 0 + assert iter_values == [{}] + iter_values.clear() + + result = ForEach('{};', function=for_each(None, iter_values)).visit(context) + assert len(result) == 0 + assert iter_values == [{}] + iter_values.clear() + + result = ForEach('{};{}', function=for_each([], iter_values)).visit(context) + assert len(result) == 0 + assert iter_values == [{}, {}] + iter_values.clear() + + result = ForEach(' {} ; {} ', function=for_each([], iter_values)).visit(context) + assert len(result) == 0 + assert iter_values == [{}, {}] + iter_values.clear() + + result = ForEach('{};{};{}', function=for_each([Action()], iter_values)).visit(context) + assert len(result) == 3 + assert isinstance(result[0], Action) + assert isinstance(result[1], Action) + assert isinstance(result[2], Action) + assert iter_values == [{}, {}, {}] + iter_values.clear() + + # Normal case + result = ForEach("{name: 'a'};{name: 'b'}", function=for_each([], iter_values)).visit(context) + assert len(result) == 0 + assert iter_values == [{'name': 'a'}, {'name': 'b'}] + iter_values.clear() + + result = ForEach( + "{name: 'a'};{name: 'b'}", + function=for_each(None, iter_values), + ).visit(context) + assert len(result) == 0 + assert iter_values == [{'name': 'a'}, {'name': 'b'}] + iter_values.clear() + + result = ForEach( + "{name: 'a'};{name: 'b'}", + function=for_each([Action()], iter_values), + ).visit(context) + assert len(result) == 2 + assert isinstance(result[0], Action) + assert isinstance(result[1], Action) + assert iter_values == [{'name': 'a'}, {'name': 'b'}] + iter_values.clear() + + +def test_for_each_execute_substitution(): + context = LaunchContext() + iter_values = [] + + # Text + result = ForEach(TextSubstitution(text=''), function=for_each([], iter_values)).visit(context) + assert len(result) == 0 + assert iter_values == [] + iter_values.clear() + + # Launch arg, first with default value then non-default value + DeclareLaunchArgument('config', default_value='{id: 42};{id: 27}').visit(context) + result = ForEach( + LaunchConfiguration('config'), + function=for_each([Action()], iter_values), + ).visit(context) + assert len(result) == 2 + assert iter_values == [{'id': 42}, {'id': 27}] + iter_values.clear() + context.launch_configurations['config'] = '{id: 6};{id: 9};{id: 420}' + result = ForEach( + LaunchConfiguration('config'), + function=for_each([Action()], iter_values), + ).visit(context) + assert len(result) == 3 + assert iter_values == [{'id': 6}, {'id': 9}, {'id': 420}] + iter_values.clear() + context.launch_configurations.clear() + + # Python expression + DeclareLaunchArgument('num', default_value='3').visit(context) + result = ForEach( + PythonExpression( + ["';'.join([str({'num': i * 2}) for i in range(", LaunchConfiguration('num'), ')])'], + ), + function=for_each([Action(), Action()], iter_values), + ).visit(context) + assert len(result) == 6 + assert isinstance(result[0], Action) + assert isinstance(result[1], Action) + assert isinstance(result[2], Action) + assert isinstance(result[3], Action) + assert isinstance(result[4], Action) + assert isinstance(result[5], Action) + assert iter_values == [{'num': 0}, {'num': 2}, {'num': 4}] + iter_values.clear() + context.launch_configurations.clear() + + +def for_each_args( + returned_entities: List[LaunchDescriptionEntity], + args_collector: Optional[List[Mapping[str, Any]]] = None, +) -> Callable[..., Optional[List[LaunchDescriptionEntity]]]: + def f(a: str, b: int, c: List[int]) -> List[LaunchDescriptionEntity]: + if args_collector is not None: + args_collector.append({'a': a, 'b': b, 'c': c}) + return returned_entities + return f + + +def for_each_args_kwargs( + returned_entities: List[LaunchDescriptionEntity], + args_collector: Optional[List[Mapping[str, Any]]] = None, +) -> Callable[..., Optional[List[LaunchDescriptionEntity]]]: + def f(a: str, *, b: int, c: List[int]) -> List[LaunchDescriptionEntity]: + if args_collector is not None: + args_collector.append({'a': a, 'b': b, 'c': c}) + return returned_entities + return f + + +def for_each_args_kwargs_default( + returned_entities: List[LaunchDescriptionEntity], + args_collector: Optional[List[Mapping[str, Any]]] = None, +) -> Callable[..., Optional[List[LaunchDescriptionEntity]]]: + def f(a: str, *, b: int, c: List[int] = [4, 2, 0]) -> List[LaunchDescriptionEntity]: + if args_collector is not None: + args_collector.append({'a': a, 'b': b, 'c': c}) + return returned_entities + return f + + +def test_for_each_execute_args(): + context = LaunchContext() + iter_values = [] + + # Order of items in YAML dicts does not matter + result = ForEach( + '{a: 1, b: 2, c: 3};{c: 1, a: 2, b: 3}', + function=for_each([], iter_values), + ).visit(context) + assert len(result) == 0 + assert iter_values == [{'a': 1, 'b': 2, 'c': 3}, {'a': 2, 'b': 3, 'c': 1}] + iter_values.clear() + + # Using callback with args + result = ForEach( + "{a: 'tw', b: 1, c: []};{c: [4], a: 'll', b: 2}", + function=for_each_args([], iter_values), + ).visit(context) + assert len(result) == 0 + assert iter_values == [{'a': 'tw', 'b': 1, 'c': []}, {'a': 'll', 'b': 2, 'c': [4]}] + iter_values.clear() + + # Using callback with args and kwargs + result = ForEach( + "{a: 'tw', b: 1, c: []};{c: [4], a: 'll', b: 2}", + function=for_each_args_kwargs([], iter_values), + ).visit(context) + assert len(result) == 0 + assert iter_values == [{'a': 'tw', 'b': 1, 'c': []}, {'a': 'll', 'b': 2, 'c': [4]}] + iter_values.clear() + + # Using callback with args, kwargs, and default values + result = ForEach( + "{a: 'tw', b: 1, c: []};{a: 'll', b: 2}", + function=for_each_args_kwargs_default([], iter_values), + ).visit(context) + assert len(result) == 0 + assert iter_values == [{'a': 'tw', 'b': 1, 'c': []}, {'a': 'll', 'b': 2, 'c': [4, 2, 0]}] + iter_values.clear() + + +def for_i( + returned_entities: List[LaunchDescriptionEntity], + i_collector: Optional[List[int]] = None, +) -> Callable[[int], Optional[List[LaunchDescriptionEntity]]]: + def f(i: int) -> List[LaunchDescriptionEntity]: + if i_collector is not None: + i_collector.append(i) + return returned_entities + return f + + +def test_for_loop_constructors(): + """Test the constructors for the ForLoop class.""" + f = for_i([]) + action = ForLoop('2', function=f, name='my-for-loop') + assert len(action.length) == 1 + assert isinstance(action.length[0], TextSubstitution) + assert action.length[0].text == '2' + assert action.function == f + assert action.name == 'my-for-loop' + assert action.describe().startswith('ForLoop') + + action = ForLoop(LaunchConfiguration('num'), function=f) + assert action.function == f + + +def test_for_loop_execute(): + """Test the execute() of the ForLoop class.""" + context = LaunchContext() + i_values = [] + + # Empty input + with pytest.raises(ValueError): + ForLoop('', function=for_i([], i_values)).visit(context) + + # No iterations + result = ForLoop('0', function=for_i([], i_values)).visit(context) + assert len(result) == 1 + assert isinstance(result[0], ForEach) + result_for_each = result[0].visit(context) + assert len(result_for_each) == 0 + assert i_values == [] + i_values.clear() + + result = ForLoop('0', function=for_i(None, i_values)).visit(context) + assert len(result) == 1 + result_for_each = result[0].visit(context) + assert len(result_for_each) == 0 + assert i_values == [] + i_values.clear() + + # Normal case + result = ForLoop('2', function=for_i([], i_values)).visit(context) + assert len(result) == 1 + result_for_each = result[0].visit(context) + assert len(result_for_each) == 0 + assert i_values == [0, 1] + i_values.clear() + + result = ForLoop('0', function=for_i([Action()], i_values)).visit(context) + assert len(result) == 1 + result_for_each = result[0].visit(context) + assert len(result_for_each) == 0 + assert i_values == [] + i_values.clear() + + result = ForLoop('2', function=for_i([Action()], i_values)).visit(context) + assert len(result) == 1 + result_for_each = result[0].visit(context) + assert len(result_for_each) == 2 + assert isinstance(result_for_each[0], Action) + assert isinstance(result_for_each[1], Action) + assert i_values == [0, 1] + i_values.clear() + + +def test_for_loop_execute_substitutions(): + context = LaunchContext() + i_values = [] + + # Launch arg, first with default value then non-default value + DeclareLaunchArgument('num', default_value='4').visit(context) + result = ForLoop( + LaunchConfiguration('num'), + function=for_i([Action()], i_values), + ).visit(context) + assert len(result) == 1 + result_for_each = result[0].visit(context) + assert len(result_for_each) == 4 + assert i_values == [0, 1, 2, 3] + i_values.clear() + context.launch_configurations['num'] = '5' + result = ForLoop( + LaunchConfiguration('num'), + function=for_i([Action()], i_values), + ).visit(context) + assert len(result) == 1 + result_for_each = result[0].visit(context) + assert len(result_for_each) == 5 + assert i_values == [0, 1, 2, 3, 4] + i_values.clear() + context.launch_configurations.clear() diff --git a/launch/test/launch/substitutions/test_for_loop_var.py b/launch/test/launch/substitutions/test_for_loop_var.py new file mode 100644 index 000000000..370e54699 --- /dev/null +++ b/launch/test/launch/substitutions/test_for_loop_var.py @@ -0,0 +1,37 @@ +# Copyright 2025 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the ForEachVar substitution.""" + +from launch import LaunchContext +from launch.substitutions import ForEachVar +from launch.substitutions import TextSubstitution +import pytest + + +def test_for_each_var(): + context = LaunchContext() + + # No value available, no default value provided + with pytest.raises(RuntimeError): + ForEachVar('name').perform(context) + + # No value available, default value provided + value = ForEachVar('name', default_value='default').perform(context) + assert value == 'default' + + # Value available + context.extend_locals({ForEachVar.get_local_arg_name('name'): 'some_value'}) + value = ForEachVar(TextSubstitution(text='name')).perform(context) + assert value == 'some_value' diff --git a/launch_xml/test/launch_xml/test_for_loop.py b/launch_xml/test/launch_xml/test_for_loop.py new file mode 100644 index 000000000..0d0bdd52c --- /dev/null +++ b/launch_xml/test/launch_xml/test_for_loop.py @@ -0,0 +1,154 @@ +# Copyright 2025 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test parsing a ForLoop action and a ForLoopIndex substitution.""" + +import io +import textwrap + +from launch.actions import DeclareLaunchArgument +from launch.actions import ForEach +from launch.actions import ForLoop +from launch.actions import LogInfo +from launch.actions import OpaqueFunction +from launch.frontend import Parser +from launch.launch_context import LaunchContext +from launch.substitutions import ForEachVar +from launch.substitutions import ForLoopIndex +from launch.utilities import perform_substitutions + + +def test_for_each(): + xml_file = textwrap.dedent( + """ + + + + + + + """ + ) + root_entity, parser = Parser.load(io.StringIO(xml_file)) + ld = parser.parse_description(root_entity) + + assert len(ld.entities) == 2 + assert isinstance(ld.entities[0], DeclareLaunchArgument) + assert isinstance(ld.entities[1], ForEach) + + lc = LaunchContext() + ld.entities[0].visit(lc) + actions = ld.entities[1].visit(lc) + # For each iteration: + # 2 OpaqueFunction + # N user-defined entities + # 1 OpaqueFunction + # = 3 + N entitites/iteration + assert len(actions) == 3 * (3 + 1) + assert isinstance(actions[0], OpaqueFunction) + assert isinstance(actions[1], OpaqueFunction) + assert isinstance(actions[2], LogInfo) + assert isinstance(actions[3], OpaqueFunction) + assert isinstance(actions[4], OpaqueFunction) + assert isinstance(actions[5], OpaqueFunction) + assert isinstance(actions[6], LogInfo) + assert isinstance(actions[7], OpaqueFunction) + assert isinstance(actions[8], OpaqueFunction) + assert isinstance(actions[9], OpaqueFunction) + assert isinstance(actions[10], LogInfo) + assert isinstance(actions[11], OpaqueFunction) + actions[0].visit(lc) + actions[1].visit(lc) + actions[2].visit(lc) + assert isinstance(actions[2].msg[1], ForEachVar) + assert perform_substitutions(lc, actions[2].msg[1].name) == 'name' + assert perform_substitutions(lc, actions[2].msg[3].name) == 'id' + assert perform_substitutions(lc, actions[2].msg[5].name) == 'opt' + assert perform_substitutions(lc, actions[2].msg) == "'a' id=1 (none)" + actions[3].visit(lc) + actions[4].visit(lc) + actions[5].visit(lc) + actions[6].visit(lc) + assert isinstance(actions[6].msg[1], ForEachVar) + assert perform_substitutions(lc, actions[6].msg[1].name) == 'name' + assert perform_substitutions(lc, actions[6].msg[3].name) == 'id' + assert perform_substitutions(lc, actions[6].msg[5].name) == 'opt' + assert perform_substitutions(lc, actions[6].msg) == "'b' id=2 (none)" + actions[7].visit(lc) + actions[8].visit(lc) + actions[9].visit(lc) + actions[10].visit(lc) + assert isinstance(actions[10].msg[1], ForEachVar) + assert perform_substitutions(lc, actions[10].msg[1].name) == 'name' + assert perform_substitutions(lc, actions[10].msg[3].name) == 'id' + assert perform_substitutions(lc, actions[10].msg[5].name) == 'opt' + assert perform_substitutions(lc, actions[10].msg) == "'c' id=0 (*)" + actions[11].visit(lc) + + +def test_for_loop(): + xml_file = textwrap.dedent( + """ + + + + + + + """ + ) + root_entity, parser = Parser.load(io.StringIO(xml_file)) + ld = parser.parse_description(root_entity) + + assert len(ld.entities) == 2 + assert isinstance(ld.entities[0], DeclareLaunchArgument) + assert isinstance(ld.entities[1], ForLoop) + + lc = LaunchContext() + ld.entities[0].visit(lc) + actions = ld.entities[1].visit(lc) + assert len(actions) == 1 + assert isinstance(actions[0], ForEach) + actions_for_each = actions[0].visit(lc) + # For each iteration: + # 2 OpaqueFunction + # N user-defined entities + # 1 OpaqueFunction + # = 3 + N entitites/iteration + assert len(actions_for_each) == 2 * (3 + 1) + assert isinstance(actions_for_each[0], OpaqueFunction) + assert isinstance(actions_for_each[1], OpaqueFunction) + assert isinstance(actions_for_each[2], LogInfo) + assert isinstance(actions_for_each[3], OpaqueFunction) + assert isinstance(actions_for_each[4], OpaqueFunction) + assert isinstance(actions_for_each[5], OpaqueFunction) + assert isinstance(actions_for_each[6], LogInfo) + assert isinstance(actions_for_each[7], OpaqueFunction) + actions_for_each[0].visit(lc) + actions_for_each[1].visit(lc) + actions_for_each[2].visit(lc) + assert isinstance(actions_for_each[2].msg[1], ForLoopIndex) + assert perform_substitutions(lc, actions_for_each[2].msg[1].name) == 'i' + assert perform_substitutions(lc, actions_for_each[2].msg) == 'index=0' + actions_for_each[3].visit(lc) + actions_for_each[4].visit(lc) + actions_for_each[5].visit(lc) + actions_for_each[6].visit(lc) + assert isinstance(actions_for_each[6].msg[1], ForLoopIndex) + assert perform_substitutions(lc, actions_for_each[6].msg[1].name) == 'i' + assert perform_substitutions(lc, actions_for_each[6].msg) == 'index=1' + actions_for_each[7].visit(lc) diff --git a/launch_yaml/test/launch_yaml/test_for_loop.py b/launch_yaml/test/launch_yaml/test_for_loop.py new file mode 100644 index 000000000..e0c90b3c6 --- /dev/null +++ b/launch_yaml/test/launch_yaml/test_for_loop.py @@ -0,0 +1,158 @@ +# Copyright 2025 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test parsing a ForLoop action and a ForLoopIndex substitution.""" + +import io +import textwrap + +from launch.actions import DeclareLaunchArgument +from launch.actions import ForEach +from launch.actions import ForLoop +from launch.actions import LogInfo +from launch.actions import OpaqueFunction +from launch.frontend import Parser +from launch.launch_context import LaunchContext +from launch.substitutions import ForEachVar +from launch.substitutions import ForLoopIndex +from launch.utilities import perform_substitutions + + +def test_for_each(): + xml_file = textwrap.dedent( + """ + launch: + - arg: + name: robots + default: "{name: 'a', id: 1};{name: 'b', id: 2};{name: 'c', opt: '*'}" + - for_each: + values: $(var robots) + children: + - log: + message: "'$(for-var name)' id=$(for-var id 0) ($(for-var opt 'none'))" + """ + ) + root_entity, parser = Parser.load(io.StringIO(xml_file)) + ld = parser.parse_description(root_entity) + + assert len(ld.entities) == 2 + assert isinstance(ld.entities[0], DeclareLaunchArgument) + assert isinstance(ld.entities[1], ForEach) + + lc = LaunchContext() + ld.entities[0].visit(lc) + actions = ld.entities[1].visit(lc) + # For each iteration: + # 2 OpaqueFunction + # N user-defined entities + # 1 OpaqueFunction + # = 3 + N entitites/iteration + assert len(actions) == 3 * (3 + 1) + assert isinstance(actions[0], OpaqueFunction) + assert isinstance(actions[1], OpaqueFunction) + assert isinstance(actions[2], LogInfo) + assert isinstance(actions[3], OpaqueFunction) + assert isinstance(actions[4], OpaqueFunction) + assert isinstance(actions[5], OpaqueFunction) + assert isinstance(actions[6], LogInfo) + assert isinstance(actions[7], OpaqueFunction) + assert isinstance(actions[8], OpaqueFunction) + assert isinstance(actions[9], OpaqueFunction) + assert isinstance(actions[10], LogInfo) + assert isinstance(actions[11], OpaqueFunction) + actions[0].visit(lc) + actions[1].visit(lc) + actions[2].visit(lc) + assert isinstance(actions[2].msg[1], ForEachVar) + assert perform_substitutions(lc, actions[2].msg[1].name) == 'name' + assert perform_substitutions(lc, actions[2].msg[3].name) == 'id' + assert perform_substitutions(lc, actions[2].msg[5].name) == 'opt' + assert perform_substitutions(lc, actions[2].msg) == "'a' id=1 (none)" + actions[3].visit(lc) + actions[4].visit(lc) + actions[5].visit(lc) + actions[6].visit(lc) + assert isinstance(actions[6].msg[1], ForEachVar) + assert perform_substitutions(lc, actions[6].msg[1].name) == 'name' + assert perform_substitutions(lc, actions[6].msg[3].name) == 'id' + assert perform_substitutions(lc, actions[6].msg[5].name) == 'opt' + assert perform_substitutions(lc, actions[6].msg) == "'b' id=2 (none)" + actions[7].visit(lc) + actions[8].visit(lc) + actions[9].visit(lc) + actions[10].visit(lc) + assert isinstance(actions[10].msg[1], ForEachVar) + assert perform_substitutions(lc, actions[10].msg[1].name) == 'name' + assert perform_substitutions(lc, actions[10].msg[3].name) == 'id' + assert perform_substitutions(lc, actions[10].msg[5].name) == 'opt' + assert perform_substitutions(lc, actions[10].msg) == "'c' id=0 (*)" + actions[11].visit(lc) + + +def test_for_loop(): + yaml_file = textwrap.dedent( + """ + launch: + - arg: + name: num_i + default: '2' + - for: + len: $(var num_i) + name: i + children: + - log: + message: index=$(index i) + """ + ) + root_entity, parser = Parser.load(io.StringIO(yaml_file)) + ld = parser.parse_description(root_entity) + + assert len(ld.entities) == 2 + assert isinstance(ld.entities[0], DeclareLaunchArgument) + assert isinstance(ld.entities[1], ForLoop) + + lc = LaunchContext() + ld.entities[0].visit(lc) + actions = ld.entities[1].visit(lc) + assert len(actions) == 1 + assert isinstance(actions[0], ForEach) + actions_for_each = actions[0].visit(lc) + # For each iteration: + # 2 OpaqueFunction + # N user-defined entities + # 1 OpaqueFunction + # = 3 + N entitites/iteration + assert len(actions_for_each) == 2 * (3 + 1) + assert isinstance(actions_for_each[0], OpaqueFunction) + assert isinstance(actions_for_each[1], OpaqueFunction) + assert isinstance(actions_for_each[2], LogInfo) + assert isinstance(actions_for_each[3], OpaqueFunction) + assert isinstance(actions_for_each[4], OpaqueFunction) + assert isinstance(actions_for_each[5], OpaqueFunction) + assert isinstance(actions_for_each[6], LogInfo) + assert isinstance(actions_for_each[7], OpaqueFunction) + actions_for_each[0].visit(lc) + actions_for_each[1].visit(lc) + actions_for_each[2].visit(lc) + assert isinstance(actions_for_each[2].msg[1], ForLoopIndex) + assert perform_substitutions(lc, actions_for_each[2].msg[1].name) == 'i' + assert perform_substitutions(lc, actions_for_each[2].msg) == 'index=0' + actions_for_each[3].visit(lc) + actions_for_each[4].visit(lc) + actions_for_each[5].visit(lc) + actions_for_each[6].visit(lc) + assert isinstance(actions_for_each[6].msg[1], ForLoopIndex) + assert perform_substitutions(lc, actions_for_each[6].msg[1].name) == 'i' + assert perform_substitutions(lc, actions_for_each[6].msg) == 'index=1' + actions_for_each[7].visit(lc)