Skip to content

Commit

Permalink
Fix the SIGTERM handling in the ros2 daemon. (#887)
Browse files Browse the repository at this point in the history
* Fix the SIGTERM handling in the ros2 daemon.

There are 2 fixes needed here:
1.  Make sure to check rclpy.ok() in the overall loop
for handling requests.  That's because rclpy is handling
the signals by default.
2.  Realize that server.handle_request() uses the set
timeout as a timeout to select(), essentially.  Because of
that, we instead set the timeout to a short value (200 milliseconds),
and do the overall timeout checking ourselves.

Note that because we are waking up more often, this will cause
the ros2 daemon to use more CPU than before.  But it should
be negligible overall.

Signed-off-by: Chris Lalancette <[email protected]>
Co-authored-by: Scott K Logan <[email protected]>
  • Loading branch information
clalancette and cottsay authored Mar 8, 2024
1 parent d3cfbd7 commit b16fb02
Showing 1 changed file with 78 additions and 59 deletions.
137 changes: 78 additions & 59 deletions ros2cli/ros2cli/daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import argparse
import os
import time
import uuid

import rclpy
Expand Down Expand Up @@ -52,7 +53,7 @@ def get_xmlrpc_server_url(address=None):
return f'http://{host}:{port}{path}'


def make_xmlrpc_server():
def make_xmlrpc_server() -> LocalXMLRPCServer:
"""Make local XMLRPC server listening over ros2cli daemon's default port."""
address = get_address()

Expand All @@ -63,74 +64,91 @@ def make_xmlrpc_server():
)


def serve(server, *, timeout=2 * 60 * 60):
def serve(server: LocalXMLRPCServer, *, timeout: int = 2 * 60 * 60):
"""
Serve the ros2cli daemon API using the given `server`.
:param server: an XMLRPC server instance
:param timeout: how long to wait before shutting
down the server due to inactivity.
"""
try:
ros_domain_id = get_ros_domain_id()
node_args = argparse.Namespace(
node_name_suffix=f'_daemon_{ros_domain_id}_{uuid.uuid4().hex}',
start_parameter_services=False,
start_type_description_service=False)
with NetworkAwareNode(node_args) as node:
functions = [
node.get_name,
node.get_namespace,
node.get_node_names_and_namespaces,
node.get_node_names_and_namespaces_with_enclaves,
node.get_topic_names_and_types,
node.get_service_names_and_types,
bind(rclpy.action.get_action_names_and_types, node),
node.get_publisher_names_and_types_by_node,
node.get_publishers_info_by_topic,
node.get_subscriber_names_and_types_by_node,
node.get_subscriptions_info_by_topic,
node.get_service_names_and_types_by_node,
node.get_client_names_and_types_by_node,
bind(rclpy.action.get_action_server_names_and_types_by_node, node),
bind(rclpy.action.get_action_client_names_and_types_by_node, node),
node.count_publishers,
node.count_subscribers,
node.count_clients,
node.count_services
]

server.register_introspection_functions()
for func in functions:
server.register_function(
before_invocation(
func, pretty_print_call))

shutdown = False

# shutdown the daemon in case of a timeout
def timeout_handler():
nonlocal shutdown
ros_domain_id = get_ros_domain_id()
node_args = argparse.Namespace(
node_name_suffix=f'_daemon_{ros_domain_id}_{uuid.uuid4().hex}',
start_parameter_services=False,
start_type_description_service=False)
with NetworkAwareNode(node_args) as node:
functions = [
node.get_name,
node.get_namespace,
node.get_node_names_and_namespaces,
node.get_node_names_and_namespaces_with_enclaves,
node.get_topic_names_and_types,
node.get_service_names_and_types,
bind(rclpy.action.get_action_names_and_types, node),
node.get_publisher_names_and_types_by_node,
node.get_publishers_info_by_topic,
node.get_subscriber_names_and_types_by_node,
node.get_subscriptions_info_by_topic,
node.get_service_names_and_types_by_node,
node.get_client_names_and_types_by_node,
bind(rclpy.action.get_action_server_names_and_types_by_node, node),
bind(rclpy.action.get_action_client_names_and_types_by_node, node),
node.count_publishers,
node.count_subscribers,
node.count_clients,
node.count_services
]

# Dealing with the timeouts in this server is a bit tricky. The caller
# passes in an overall inactivity timeout via the 'timeout' parameter;
# this server should quit when there is no activity within that timeout.
# The 'server.timeout' specifies how long 'server.handle_request()'
# should wait in 'select()' before returning with no work to do. We set
# the 'server.timeout' to 200 milliseconds so we will react fairly
# quickly to external signals and quit. To deal with the overall
# timeout, each function that is called resets the timer. The result
# of all of this is that we properly react to signals (SIGINT and
# SIGTERM), we properly timeout when we are idle, and we properly quit
# when we are told to shutdown.

last_function_call_time = time.time()

def reset_timer_and_pretty_print(func, *args, **kwargs):
nonlocal last_function_call_time
last_function_call_time = time.time()
pretty_print_call(func, args, kwargs)

server.register_introspection_functions()
for func in functions:
server.register_function(
before_invocation(
func, reset_timer_and_pretty_print))

shutdown = False

def timeout_handler():
nonlocal shutdown

if time.time() - last_function_call_time > timeout:
print('Shutdown due to timeout')
shutdown = True
server.handle_timeout = timeout_handler
server.timeout = timeout
server.handle_timeout = timeout_handler
server.timeout = 0.2

# function to shutdown daemon remotely
def shutdown_handler():
nonlocal shutdown
print('Remote shutdown requested')
shutdown = True
server.register_function(shutdown_handler, 'system.shutdown')
# function to shutdown daemon remotely
def shutdown_handler():
nonlocal shutdown
print('Remote shutdown requested')
shutdown = True
server.register_function(shutdown_handler, 'system.shutdown')

print('Serving XML-RPC on ' + get_xmlrpc_server_url(server.server_address))
try:
while not shutdown:
server.handle_request()
except KeyboardInterrupt:
pass
finally:
server.server_close()
print('Serving XML-RPC on ' + get_xmlrpc_server_url(server.server_address))
try:
while rclpy.ok() and not shutdown:
server.handle_request()
except KeyboardInterrupt:
pass


def main(*, argv=None):
Expand All @@ -153,7 +171,8 @@ def main(*, argv=None):
assert args.rmw_implementation == rclpy.get_rmw_implementation_identifier()
assert args.ros_domain_id == get_ros_domain_id()

serve(make_xmlrpc_server(), timeout=args.timeout)
with make_xmlrpc_server() as server:
serve(server, timeout=args.timeout)


if __name__ == '__main__':
Expand Down

0 comments on commit b16fb02

Please sign in to comment.