Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set multiple parameters #728

Open
wants to merge 2 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 40 additions & 22 deletions ros2param/ros2param/verb/set.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import sys

from rcl_interfaces.msg import Parameter
Expand All @@ -28,6 +29,17 @@
from ros2param.verb import VerbExtension


class RequireParameterPairAction(argparse.Action):
"""Argparse action to validate parameter argument pairs."""

def __call__(self, parser, args, values, option_string=None):
if len(values) == 0:
parser.error('No parameters specified')
if len(values) % 2:
parser.error('Must provide parameter name and value pairs')
setattr(args, self.dest, values)


class SetVerb(VerbExtension):
"""Set parameter."""

Expand All @@ -41,38 +53,44 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'--include-hidden-nodes', action='store_true',
help='Consider hidden nodes as well')
arg = parser.add_argument(
'parameter_name', help='Name of the parameter')
'parameters', nargs='*',
action=RequireParameterPairAction,
help='List of parameter name and value pairs i.e. "int_param 1 str_param hello_world"')
arg.completer = ParameterNameCompleter()
parser.add_argument(
'value', help='Value of the parameter')

def build_parameters(self, params):
ihasdapie marked this conversation as resolved.
Show resolved Hide resolved
parameters = []
for i in range(0, len(params), 2):
parameter = Parameter()
parameter.name = params[i]
parameter.value = get_parameter_value(string_value=params[i+1])
parameters.append(parameter)
return parameters

def main(self, *, args): # noqa: D102
with NodeStrategy(args) as node:
node_names = get_node_names(
node=node, include_hidden_nodes=args.include_hidden_nodes)

node_name = get_absolute_node_name(args.node_name)

ihasdapie marked this conversation as resolved.
Show resolved Hide resolved
if node_name not in {n.full_name for n in node_names}:
return 'Node not found'

with DirectNode(args) as node:
parameter = Parameter()
Parameter.name = args.parameter_name
parameter.value = get_parameter_value(string_value=args.value)

parameters = self.build_parameters(args.parameters)
response = call_set_parameters(
node=node, node_name=args.node_name, parameters=[parameter])
node=node, node_name=args.node_name, parameters=parameters)
results = response.results

# output response
assert len(response.results) == 1
result = response.results[0]
if result.successful:
msg = 'Set parameter successful'
if result.reason:
msg += ': ' + result.reason
print(msg)
else:
msg = 'Setting parameter failed'
if result.reason:
msg += ': ' + result.reason
print(msg, file=sys.stderr)
for i, result in enumerate(results):
msg = "Set parameter '%s' " % (parameters[i].name)
if result.successful:
msg += 'successful'
if result.reason:
msg += ': ' + result.reason
print(msg)
else:
msg += 'failed'
if result.reason:
msg += ': ' + result.reason
print(msg, file=sys.stderr)
222 changes: 222 additions & 0 deletions ros2param/test/test_verb_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# Copyright 2022 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import os
import sys
import time
import unittest
import xmlrpc

from launch import LaunchDescription
from launch.actions import ExecuteProcess
from launch_ros.actions import Node
import launch_testing
import launch_testing.actions
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools
import launch_testing_ros.tools

import pytest

import rclpy
from rclpy.utilities import get_available_rmw_implementations

from ros2cli.node.strategy import NodeStrategy


TEST_NODE = 'test_node'
TEST_NAMESPACE = '/foo'
TEST_TIMEOUT = 20.0

# Skip cli tests on Windows while they exhibit pathological behavior
# https://github.com/ros2/build_farmer/issues/248
if sys.platform.startswith('win'):
pytest.skip(
'CLI tests can block for a pathological amount of time on Windows.',
allow_module_level=True)


@pytest.mark.rostest
@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
def generate_test_description(rmw_implementation):
path_to_fixtures = os.path.join(os.path.dirname(__file__), 'fixtures')
additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}

# Parameter node test fixture
path_to_parameter_node_script = os.path.join(path_to_fixtures, 'parameter_node.py')
parameter_node = Node(
executable=sys.executable,
name=TEST_NODE,
namespace=TEST_NAMESPACE,
arguments=[path_to_parameter_node_script],
additional_env=additional_env
)

return LaunchDescription([
# TODO(jacobperron): Provide a common RestartCliDaemon launch action in ros2cli
ExecuteProcess(
cmd=['ros2', 'daemon', 'stop'],
name='daemon-stop',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'daemon', 'start'],
name='daemon-start',
on_exit=[
parameter_node,
launch_testing.actions.ReadyToTest(),
],
additional_env=additional_env
)
]
),
])


class TestVerbSet(unittest.TestCase):

@classmethod
def setUpClass(
cls,
launch_service,
proc_info,
proc_output,
rmw_implementation
):
rmw_implementation_filter = launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=rmw_implementation
)

@contextlib.contextmanager
def launch_param_set_command(self, arguments):
param_set_command_action = ExecuteProcess(
cmd=['ros2', 'param', 'set', *arguments],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
},
name='ros2param-set-cli',
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, param_set_command_action, proc_info, proc_output,
output_filter=rmw_implementation_filter
) as param_set_command:
yield param_set_command
cls.launch_param_set_command = launch_param_set_command

@contextlib.contextmanager
def launch_param_dump_command(self, arguments):
param_dump_command_action = ExecuteProcess(
cmd=['ros2', 'param', 'dump', *arguments],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
},
name='ros2param-dump-cli',
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, param_dump_command_action, proc_info, proc_output,
output_filter=rmw_implementation_filter
) as param_dump_command:
yield param_dump_command
cls.launch_param_dump_command = launch_param_dump_command

def setUp(self):
# Ensure the daemon node is running and discovers the test node
start_time = time.time()
timed_out = True
with NodeStrategy(None) as node:
while (time.time() - start_time) < TEST_TIMEOUT:
# TODO(jacobperron): Create a generic 'CliNodeError' so we can treat errors
# from DirectNode and DaemonNode the same
try:
services = node.get_service_names_and_types_by_node(TEST_NODE, TEST_NAMESPACE)
except rclpy.node.NodeNameNonExistentError:
continue
except xmlrpc.client.Fault as e:
if 'NodeNameNonExistentError' in e.faultString:
continue
raise

service_names = [name_type_tuple[0] for name_type_tuple in services]
if (
len(service_names) > 0
and f'{TEST_NAMESPACE}/{TEST_NODE}/get_parameters' in service_names
):
timed_out = False
break
if timed_out:
self.fail(f'CLI daemon failed to find test node after {TEST_TIMEOUT} seconds')

def test_verb_set_single(self):
with self.launch_param_set_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', 'int_param', '1']
) as param_load_command:
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_load_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=["Set parameter 'int_param' successful"],
text=param_load_command.output,
strict=True)

def test_verb_set_multiple(self):
with self.launch_param_set_command(
arguments=[
f'{TEST_NAMESPACE}/{TEST_NODE}',
'int_param', '1',
'str_param', 'hello world',
'bool_param', 'True',
]
) as param_load_command:
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_load_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
"Set parameter 'int_param' successful",
"Set parameter 'str_param' successful",
"Set parameter 'bool_param' successful"],
text=param_load_command.output,
strict=True)

def test_verb_set_odd_params(self):
with self.launch_param_set_command(
arguments=[
f'{TEST_NAMESPACE}/{TEST_NODE}',
'int_param', '1',
'str_param', 'hello world',
'bool_param',
]
) as param_load_command:
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_load_command.exit_code == 2
assert launch_testing.tools.expect_output(
expected_lines=['ros2 param set: error: Must provide parameter name and value pairs'],
text=param_load_command.output,
strict=False
)

def test_verb_set_invalid_node(self):
with self.launch_param_set_command(
arguments=[
f'{TEST_NAMESPACE}/{TEST_NODE}_invalid',
'int_param', '123',
]
) as param_load_command:
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_load_command.exit_code == 1
assert launch_testing.tools.expect_output(
expected_lines=['Node not found'],
text=param_load_command.output,
strict=True)