diff --git a/sros2/sros2/api/__init__.py b/sros2/sros2/api/__init__.py index afe6bbfb..8a477674 100644 --- a/sros2/sros2/api/__init__.py +++ b/sros2/sros2/api/__init__.py @@ -21,28 +21,21 @@ from cryptography.hazmat.backends import default_backend as cryptography_backend from cryptography.hazmat.primitives import serialization -from lxml import etree - from rclpy.exceptions import InvalidNamespaceException -from rclpy.utilities import get_rmw_implementation_identifier from rclpy.validate_namespace import validate_namespace from sros2.policy import ( get_policy_default, - get_transport_schema, - get_transport_template, load_policy, ) -from . import _keystore, _policy, _utilities +from . import _keystore, _permission, _policy, _utilities HIDDEN_NODE_PREFIX = '_' NodeName = namedtuple('NodeName', ('node', 'ns', 'fqn')) TopicInfo = namedtuple('Topic', ('fqn', 'type')) -RMW_WITH_ROS_GRAPH_INFO_TOPIC = ('rmw_fastrtps_cpp', 'rmw_fastrtps_dynamic_cpp') - def get_node_names(*, node, include_hidden_nodes=False): node_names_and_namespaces = node.get_node_names_and_namespaces() @@ -95,53 +88,6 @@ def is_key_name_valid(name): return False -def create_permission_file(path, domain_id, policy_element): - print('creating permission') - permissions_xsl_path = get_transport_template('dds', 'permissions.xsl') - permissions_xsl = etree.XSLT(etree.parse(permissions_xsl_path)) - permissions_xsd_path = get_transport_schema('dds', 'permissions.xsd') - permissions_xsd = etree.XMLSchema(etree.parse(permissions_xsd_path)) - - kwargs = {} - if get_rmw_implementation_identifier() in RMW_WITH_ROS_GRAPH_INFO_TOPIC: - kwargs['allow_ros_discovery_topic'] = etree.XSLT.strparam('1') - permissions_xml = permissions_xsl(policy_element, **kwargs) - - domain_id_elements = permissions_xml.findall('permissions/grant/*/domains/id') - for domain_id_element in domain_id_elements: - domain_id_element.text = domain_id - - try: - permissions_xsd.assertValid(permissions_xml) - except etree.DocumentInvalid as e: - raise RuntimeError(str(e)) - - with open(path, 'wb') as f: - f.write(etree.tostring(permissions_xml, pretty_print=True)) - - -def create_permission(keystore_path, identity, policy_file_path): - policy_element = _policy.get_policy(identity, policy_file_path) - create_permissions_from_policy_element(keystore_path, identity, policy_element) - return True - - -def create_permissions_from_policy_element(keystore_path, identity, policy_element): - relative_path = os.path.normpath(identity.lstrip('/')) - key_dir = os.path.join(_keystore.get_keystore_context_dir(keystore_path), relative_path) - print("creating permission file for identity: '%s'" % identity) - permissions_path = os.path.join(key_dir, 'permissions.xml') - create_permission_file(permissions_path, _utilities.domain_id(), policy_element) - - signed_permissions_path = os.path.join(key_dir, 'permissions.p7s') - keystore_ca_cert_path = os.path.join( - _keystore.get_keystore_public_dir(keystore_path), 'ca.cert.pem') - keystore_ca_key_path = os.path.join( - _keystore.get_keystore_private_dir(keystore_path), 'ca.key.pem') - _utilities.create_smime_signed_file( - keystore_ca_cert_path, keystore_ca_key_path, permissions_path, signed_permissions_path) - - def create_key(keystore_path, identity): if not _keystore.is_valid_keystore(keystore_path): print("'%s' is not a valid keystore " % keystore_path) @@ -197,7 +143,7 @@ def create_key(keystore_path, identity): context_element.attrib['path'] = identity permissions_path = os.path.join(key_dir, 'permissions.xml') - create_permission_file(permissions_path, _utilities.domain_id(), policy_element) + _permission.create_permission_file(permissions_path, _utilities.domain_id(), policy_element) signed_permissions_path = os.path.join(key_dir, 'permissions.p7s') keystore_permissions_ca_key_path = os.path.join( @@ -258,7 +204,7 @@ def generate_artifacts(keystore_path=None, identity_names=[], policy_files=[]): if not create_key(keystore_path, identity_name): return False policy_element = _policy.get_policy_from_tree(identity_name, policy_tree) - create_permissions_from_policy_element( + _permission.create_permissions_from_policy_element( keystore_path, identity_name, policy_element) return True diff --git a/sros2/sros2/api/_permission.py b/sros2/sros2/api/_permission.py new file mode 100644 index 00000000..5b590225 --- /dev/null +++ b/sros2/sros2/api/_permission.py @@ -0,0 +1,73 @@ +# Copyright 2016-2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from lxml import etree + +from rclpy.utilities import get_rmw_implementation_identifier + +from sros2.policy import get_transport_schema, get_transport_template + +from . import _keystore, _policy, _utilities + + +_RMW_WITH_ROS_GRAPH_INFO_TOPIC = ('rmw_fastrtps_cpp', 'rmw_fastrtps_dynamic_cpp') + + +def create_permission(keystore_path, identity, policy_file_path): + policy_element = _policy.get_policy(identity, policy_file_path) + create_permissions_from_policy_element(keystore_path, identity, policy_element) + return True + + +def create_permissions_from_policy_element(keystore_path, identity, policy_element): + relative_path = os.path.normpath(identity.lstrip('/')) + key_dir = os.path.join(_keystore.keystore_context_dir(keystore_path), relative_path) + print("creating permission file for identity: '%s'" % identity) + permissions_path = os.path.join(key_dir, 'permissions.xml') + create_permission_file(permissions_path, _utilities.domain_id(), policy_element) + + signed_permissions_path = os.path.join(key_dir, 'permissions.p7s') + keystore_ca_cert_path = os.path.join( + _keystore.keystore_public_dir(keystore_path), 'ca.cert.pem') + keystore_ca_key_path = os.path.join( + _keystore.keystore_private_dir(keystore_path), 'ca.key.pem') + _utilities.create_smime_signed_file( + keystore_ca_cert_path, keystore_ca_key_path, permissions_path, signed_permissions_path) + + +def create_permission_file(path, domain_id, policy_element): + print('creating permission') + permissions_xsl_path = get_transport_template('dds', 'permissions.xsl') + permissions_xsl = etree.XSLT(etree.parse(permissions_xsl_path)) + permissions_xsd_path = get_transport_schema('dds', 'permissions.xsd') + permissions_xsd = etree.XMLSchema(etree.parse(permissions_xsd_path)) + + kwargs = {} + if get_rmw_implementation_identifier() in _RMW_WITH_ROS_GRAPH_INFO_TOPIC: + kwargs['allow_ros_discovery_topic'] = etree.XSLT.strparam('1') + permissions_xml = permissions_xsl(policy_element, **kwargs) + + domain_id_elements = permissions_xml.findall('permissions/grant/*/domains/id') + for domain_id_element in domain_id_elements: + domain_id_element.text = domain_id + + try: + permissions_xsd.assertValid(permissions_xml) + except etree.DocumentInvalid as e: + raise RuntimeError(str(e)) + + with open(path, 'wb') as f: + f.write(etree.tostring(permissions_xml, pretty_print=True)) diff --git a/sros2/sros2/verb/create_permission.py b/sros2/sros2/verb/create_permission.py index 5d84de2b..2f656d69 100644 --- a/sros2/sros2/verb/create_permission.py +++ b/sros2/sros2/verb/create_permission.py @@ -23,7 +23,7 @@ def DirectoriesCompleter(): def FilesCompleter(*, allowednames, directories): return None -from sros2.api import create_permission +from sros2.api import _permission from sros2.verb import VerbExtension @@ -41,7 +41,7 @@ def add_arguments(self, parser, cli_name): def main(self, *, args): try: - success = create_permission(args.ROOT, args.NAME, args.POLICY_FILE_PATH) + success = _permission.create_permission(args.ROOT, args.NAME, args.POLICY_FILE_PATH) except FileNotFoundError as e: raise RuntimeError(str(e)) return 0 if success else 1