Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions ros2_controllers_test_nodes/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>ros2_controllers_test_nodes</name>
<version>0.0.0</version>
<description>Demo nodes for showing and testing functionalities of the ros2_control framework.</description>

<maintainer email="[email protected]">Denis Štogl</maintainer>

<license>Apache-2.0</license>

<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>trajectory_msgs</depend>

<test_depend>python3-pytest</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2022 Stogl Robotics Consulting UG (haftungsbeschränkt)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2022 Stogl Robotics Consulting UG (haftungsbeschränkt)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Author: Denis Štogl
#

import rclpy
from rclpy.node import Node

from std_msgs.msg import Float64MultiArray


class PublisherForwardPosition(Node):
def __init__(self):
super().__init__("publisher_forward_position_controller")
# Declare all parameters
self.declare_parameter("controller_name", "forward_position_controller")
self.declare_parameter("wait_sec_between_publish", 5)
self.declare_parameter("goal_names", ["pos1", "pos2"])

# Read parameters
controller_name = self.get_parameter("controller_name").value
wait_sec_between_publish = self.get_parameter("wait_sec_between_publish").value
goal_names = self.get_parameter("goal_names").value

# Read all positions from parameters
self.goals = []
for name in goal_names:
self.declare_parameter(name)
goal = self.get_parameter(name).value
if goal is None or len(goal) == 0:
raise Exception(f'Values for goal "{name}" not set!')

float_goal = []
for value in goal:
float_goal.append(float(value))
self.goals.append(float_goal)

publish_topic = "/" + controller_name + "/" + "commands"

self.get_logger().info(
f'Publishing {len(goal_names)} goals on topic "{publish_topic}"\
every {wait_sec_between_publish} s'
)

self.publisher_ = self.create_publisher(Float64MultiArray, publish_topic, 1)
self.timer = self.create_timer(wait_sec_between_publish, self.timer_callback)
self.i = 0

def timer_callback(self):
msg = Float64MultiArray()
msg.data = self.goals[self.i]
self.get_logger().info(f'Publishing: "{msg.data}"')
self.publisher_.publish(msg)
self.i += 1
self.i %= len(self.goals)


def main(args=None):
rclpy.init(args=args)

publisher_forward_position = PublisherForwardPosition()

rclpy.spin(publisher_forward_position)
publisher_forward_position.destroy_node()
rclpy.shutdown()


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Copyright 2022 Stogl Robotics Consulting UG (haftungsbeschränkt)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Author: Denis Štogl
#

import rclpy
from rclpy.node import Node
from builtin_interfaces.msg import Duration

from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState


class PublisherJointTrajectory(Node):
def __init__(self):
super().__init__("publisher_position_trajectory_controller")
# Declare all parameters
self.declare_parameter("controller_name", "position_trajectory_controller")
self.declare_parameter("wait_sec_between_publish", 6)
self.declare_parameter("goal_names", ["pos1", "pos2"])
self.declare_parameter("joints")
self.declare_parameter("check_starting_point", False)
self.declare_parameter("starting_point_limits")

# Read parameters
controller_name = self.get_parameter("controller_name").value
wait_sec_between_publish = self.get_parameter("wait_sec_between_publish").value
goal_names = self.get_parameter("goal_names").value
self.joints = self.get_parameter("joints").value
self.check_starting_point = self.get_parameter("check_starting_point").value
self.starting_point = {}

if self.joints is None or len(self.joints) == 0:
raise Exception('"joints" parameter is not set!')

# starting point stuff
if self.check_starting_point:
# declare nested params
for name in self.joints:
param_name_tmp = "starting_point_limits" + "." + name
self.declare_parameter(param_name_tmp, [-2 * 3.14159, 2 * 3.14159])
self.starting_point[name] = self.get_parameter(param_name_tmp).value

for name in self.joints:
if len(self.starting_point[name]) != 2:
raise Exception('"starting_point" parameter is not set correctly!')
self.joint_state_sub = self.create_subscription(
JointState, "joint_states", self.joint_state_callback, 10
)
# initialize starting point status
if not self.check_starting_point:
self.starting_point_ok = True
else:
self.starting_point_ok = False

self.joint_state_msg_received = False

# Read all positions from parameters
self.goals = []
for name in goal_names:
self.declare_parameter(name)
goal = self.get_parameter(name).value
if goal is None or len(goal) == 0:
raise Exception(f'Values for goal "{name}" not set!')

float_goal = []
for value in goal:
float_goal.append(float(value))
self.goals.append(float_goal)

publish_topic = "/" + controller_name + "/" + "joint_trajectory"

self.get_logger().info(
'Publishing {} goals on topic "{}" every {} s'.format(
len(goal_names), publish_topic, wait_sec_between_publish
)
)

self.publisher_ = self.create_publisher(JointTrajectory, publish_topic, 1)
self.timer = self.create_timer(wait_sec_between_publish, self.timer_callback)
self.i = 0

def timer_callback(self):

if self.starting_point_ok:

traj = JointTrajectory()
traj.joint_names = self.joints
point = JointTrajectoryPoint()
point.positions = self.goals[self.i]
point.time_from_start = Duration(sec=4)

traj.points.append(point)
self.publisher_.publish(traj)

self.i += 1
self.i %= len(self.goals)

elif self.check_starting_point and not self.joint_state_msg_received:
self.get_logger().warn(
'Start configuration could not be checked! Check "joint_state" topic!'
)
else:
self.get_logger().warn("Start configuration is not within configured limits!")

def joint_state_callback(self, msg):

if not self.joint_state_msg_received:

# check start state
limit_exceeded = [False] * len(msg.name)
for idx, enum in enumerate(msg.name):
if (msg.position[idx] < self.starting_point[enum][0]) or (
msg.position[idx] > self.starting_point[enum][1]
):
self.get_logger().warn(f"Starting point limits exceeded for joint {enum} !")
limit_exceeded[idx] = True

if any(limit_exceeded):
self.starting_point_ok = False
else:
self.starting_point_ok = True

self.joint_state_msg_received = True
else:
return


def main(args=None):
rclpy.init(args=args)

publisher_joint_trajectory = PublisherJointTrajectory()

rclpy.spin(publisher_joint_trajectory)
publisher_joint_trajectory.destroy_node()
rclpy.shutdown()


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions ros2_controllers_test_nodes/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[develop]
script-dir=$base/lib/ros2_controllers_test_nodes
[install]
install-scripts=$base/lib/ros2_controllers_test_nodes
57 changes: 57 additions & 0 deletions ros2_controllers_test_nodes/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright 2022 Stogl Robotics Consulting UG (haftungsbeschränkt)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from glob import glob

from setuptools import setup

package_name = "ros2_controllers_test_nodes"

setup(
name=package_name,
version="0.0.1",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
("share/" + package_name, glob("launch/*.launch.py")),
("share/" + package_name + "/configs", glob("configs/*.*")),
],
install_requires=["setuptools"],
zip_safe=True,
author="Denis Štogl",
author_email="[email protected]",
maintainer="Denis Štogl",
maintainer_email="[email protected]",
keywords=["ROS"],
classifiers=[
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python",
"Topic :: Software Development",
],
description="Demo nodes for showing and testing functionalities of ros2_control framework.",
long_description="""\
Demo nodes for showing and testing functionalities of the ros2_control framework.""",
license="Apache License, Version 2.0",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"publisher_forward_position_controller = \
ros2_controllers_test_nodes.publisher_forward_position_controller:main",
"publisher_joint_trajectory_controller = \
ros2_controllers_test_nodes.publisher_joint_trajectory_controller:main",
],
},
)