Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 3 additions & 57 deletions sros2/sros2/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,21 @@
from cryptography.hazmat.backends import default_backend as cryptography_backend
from cryptography.hazmat.primitives import serialization

from lxml import etree

from rclpy.exceptions import InvalidNamespaceException
from rclpy.utilities import get_rmw_implementation_identifier
from rclpy.validate_namespace import validate_namespace

from sros2.policy import (
get_policy_default,
get_transport_schema,
get_transport_template,
load_policy,
)

from . import _keystore, _policy, _utilities
from . import _keystore, _permission, _policy, _utilities

HIDDEN_NODE_PREFIX = '_'

NodeName = namedtuple('NodeName', ('node', 'ns', 'fqn'))
TopicInfo = namedtuple('Topic', ('fqn', 'type'))

RMW_WITH_ROS_GRAPH_INFO_TOPIC = ('rmw_fastrtps_cpp', 'rmw_fastrtps_dynamic_cpp')


def get_node_names(*, node, include_hidden_nodes=False):
node_names_and_namespaces = node.get_node_names_and_namespaces()
Expand Down Expand Up @@ -95,53 +88,6 @@ def is_key_name_valid(name):
return False


def create_permission_file(path, domain_id, policy_element):
print('creating permission')
permissions_xsl_path = get_transport_template('dds', 'permissions.xsl')
permissions_xsl = etree.XSLT(etree.parse(permissions_xsl_path))
permissions_xsd_path = get_transport_schema('dds', 'permissions.xsd')
permissions_xsd = etree.XMLSchema(etree.parse(permissions_xsd_path))

kwargs = {}
if get_rmw_implementation_identifier() in RMW_WITH_ROS_GRAPH_INFO_TOPIC:
kwargs['allow_ros_discovery_topic'] = etree.XSLT.strparam('1')
permissions_xml = permissions_xsl(policy_element, **kwargs)

domain_id_elements = permissions_xml.findall('permissions/grant/*/domains/id')
for domain_id_element in domain_id_elements:
domain_id_element.text = domain_id

try:
permissions_xsd.assertValid(permissions_xml)
except etree.DocumentInvalid as e:
raise RuntimeError(str(e))

with open(path, 'wb') as f:
f.write(etree.tostring(permissions_xml, pretty_print=True))


def create_permission(keystore_path, identity, policy_file_path):
policy_element = _policy.get_policy(identity, policy_file_path)
create_permissions_from_policy_element(keystore_path, identity, policy_element)
return True


def create_permissions_from_policy_element(keystore_path, identity, policy_element):
relative_path = os.path.normpath(identity.lstrip('/'))
key_dir = os.path.join(_keystore.get_keystore_context_dir(keystore_path), relative_path)
print("creating permission file for identity: '%s'" % identity)
permissions_path = os.path.join(key_dir, 'permissions.xml')
create_permission_file(permissions_path, _utilities.domain_id(), policy_element)

signed_permissions_path = os.path.join(key_dir, 'permissions.p7s')
keystore_ca_cert_path = os.path.join(
_keystore.get_keystore_public_dir(keystore_path), 'ca.cert.pem')
keystore_ca_key_path = os.path.join(
_keystore.get_keystore_private_dir(keystore_path), 'ca.key.pem')
_utilities.create_smime_signed_file(
keystore_ca_cert_path, keystore_ca_key_path, permissions_path, signed_permissions_path)


def create_key(keystore_path, identity):
if not _keystore.is_valid_keystore(keystore_path):
print("'%s' is not a valid keystore " % keystore_path)
Expand Down Expand Up @@ -197,7 +143,7 @@ def create_key(keystore_path, identity):
context_element.attrib['path'] = identity

permissions_path = os.path.join(key_dir, 'permissions.xml')
create_permission_file(permissions_path, _utilities.domain_id(), policy_element)
_permission.create_permission_file(permissions_path, _utilities.domain_id(), policy_element)

signed_permissions_path = os.path.join(key_dir, 'permissions.p7s')
keystore_permissions_ca_key_path = os.path.join(
Expand Down Expand Up @@ -258,7 +204,7 @@ def generate_artifacts(keystore_path=None, identity_names=[], policy_files=[]):
if not create_key(keystore_path, identity_name):
return False
policy_element = _policy.get_policy_from_tree(identity_name, policy_tree)
create_permissions_from_policy_element(
_permission.create_permissions_from_policy_element(
keystore_path, identity_name, policy_element)
return True

Expand Down
73 changes: 73 additions & 0 deletions sros2/sros2/api/_permission.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright 2016-2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

from lxml import etree

from rclpy.utilities import get_rmw_implementation_identifier

from sros2.policy import get_transport_schema, get_transport_template

from . import _keystore, _policy, _utilities


_RMW_WITH_ROS_GRAPH_INFO_TOPIC = ('rmw_fastrtps_cpp', 'rmw_fastrtps_dynamic_cpp')


def create_permission(keystore_path, identity, policy_file_path):
policy_element = _policy.get_policy(identity, policy_file_path)
create_permissions_from_policy_element(keystore_path, identity, policy_element)
return True


def create_permissions_from_policy_element(keystore_path, identity, policy_element):
relative_path = os.path.normpath(identity.lstrip('/'))
key_dir = os.path.join(_keystore.keystore_context_dir(keystore_path), relative_path)
print("creating permission file for identity: '%s'" % identity)
permissions_path = os.path.join(key_dir, 'permissions.xml')
create_permission_file(permissions_path, _utilities.domain_id(), policy_element)

signed_permissions_path = os.path.join(key_dir, 'permissions.p7s')
keystore_ca_cert_path = os.path.join(
_keystore.keystore_public_dir(keystore_path), 'ca.cert.pem')
keystore_ca_key_path = os.path.join(
_keystore.keystore_private_dir(keystore_path), 'ca.key.pem')
_utilities.create_smime_signed_file(
keystore_ca_cert_path, keystore_ca_key_path, permissions_path, signed_permissions_path)


def create_permission_file(path, domain_id, policy_element):
print('creating permission')
permissions_xsl_path = get_transport_template('dds', 'permissions.xsl')
permissions_xsl = etree.XSLT(etree.parse(permissions_xsl_path))
permissions_xsd_path = get_transport_schema('dds', 'permissions.xsd')
permissions_xsd = etree.XMLSchema(etree.parse(permissions_xsd_path))

kwargs = {}
if get_rmw_implementation_identifier() in _RMW_WITH_ROS_GRAPH_INFO_TOPIC:
kwargs['allow_ros_discovery_topic'] = etree.XSLT.strparam('1')
permissions_xml = permissions_xsl(policy_element, **kwargs)

domain_id_elements = permissions_xml.findall('permissions/grant/*/domains/id')
for domain_id_element in domain_id_elements:
domain_id_element.text = domain_id

try:
permissions_xsd.assertValid(permissions_xml)
except etree.DocumentInvalid as e:
raise RuntimeError(str(e))

with open(path, 'wb') as f:
f.write(etree.tostring(permissions_xml, pretty_print=True))
4 changes: 2 additions & 2 deletions sros2/sros2/verb/create_permission.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def DirectoriesCompleter():
def FilesCompleter(*, allowednames, directories):
return None

from sros2.api import create_permission
from sros2.api import _permission
from sros2.verb import VerbExtension


Expand All @@ -41,7 +41,7 @@ def add_arguments(self, parser, cli_name):

def main(self, *, args):
try:
success = create_permission(args.ROOT, args.NAME, args.POLICY_FILE_PATH)
success = _permission.create_permission(args.ROOT, args.NAME, args.POLICY_FILE_PATH)
except FileNotFoundError as e:
raise RuntimeError(str(e))
return 0 if success else 1