diff --git a/sros2/sros2/api/__init__.py b/sros2/sros2/api/__init__.py index 0c9424f3..43f61f2f 100644 --- a/sros2/sros2/api/__init__.py +++ b/sros2/sros2/api/__init__.py @@ -13,17 +13,13 @@ # limitations under the License. from collections import namedtuple -import datetime import errno import os import sys from cryptography import x509 from cryptography.hazmat.backends import default_backend as cryptography_backend -from cryptography.hazmat.bindings.openssl.binding import Binding as SSLBinding -from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import ec from lxml import etree @@ -33,24 +29,18 @@ from sros2.policy import ( get_policy_default, - get_transport_default, get_transport_schema, get_transport_template, load_policy, ) -HIDDEN_NODE_PREFIX = '_' -DOMAIN_ID_ENV = 'ROS_DOMAIN_ID' +from . import _keystore, _utilities -_DEFAULT_COMMON_NAME = 'sros2testCA' +HIDDEN_NODE_PREFIX = '_' NodeName = namedtuple('NodeName', ('node', 'ns', 'fqn')) TopicInfo = namedtuple('Topic', ('fqn', 'type')) -KS_CONTEXT = 'contexts' -KS_PUBLIC = 'public' -KS_PRIVATE = 'private' - RMW_WITH_ROS_GRAPH_INFO_TOPIC = ('rmw_fastrtps_cpp', 'rmw_fastrtps_dynamic_cpp') @@ -94,143 +84,6 @@ def get_client_info(node, node_name): return get_topics(node_name, node.get_client_names_and_types_by_node) -def _write_key( - key, - key_path, - *, - encoding=serialization.Encoding.PEM, - serialization_format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() -): - with open(key_path, 'wb') as f: - f.write(key.private_bytes( - encoding=encoding, - format=serialization_format, - encryption_algorithm=encryption_algorithm)) - - -def _write_cert(cert, cert_path, *, encoding=serialization.Encoding.PEM): - with open(cert_path, 'wb') as f: - f.write(cert.public_bytes(encoding=encoding)) - - -def create_ca_key_cert(ca_key_out_path, ca_cert_out_path): - cert, private_key = _build_key_and_cert( - x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, _DEFAULT_COMMON_NAME)]), - ca=True) - - _write_key(private_key, ca_key_out_path) - _write_cert(cert, ca_cert_out_path) - - -def create_governance_file(path, domain_id): - # for this application we are only looking to authenticate and encrypt; - # we do not need/want access control at this point. - governance_xml_path = get_transport_default('dds', 'governance.xml') - governance_xml = etree.parse(governance_xml_path) - - governance_xsd_path = get_transport_schema('dds', 'governance.xsd') - governance_xsd = etree.XMLSchema(etree.parse(governance_xsd_path)) - - domain_id_elements = governance_xml.findall( - 'domain_access_rules/domain_rule/domains/id') - for domain_id_element in domain_id_elements: - domain_id_element.text = domain_id - - try: - governance_xsd.assertValid(governance_xml) - except etree.DocumentInvalid as e: - raise RuntimeError(str(e)) - - with open(path, 'wb') as f: - f.write(etree.tostring(governance_xml, pretty_print=True)) - - -def _create_symlink(*, src, dst): - if os.path.exists(dst): - src_abs_path = os.path.join(os.path.dirname(dst), src) - if os.path.samefile(src_abs_path, dst): - return - print(f"Existing symlink '{dst}' does not match '{src_abs_path}', overriding it!") - os.remove(dst) - os.symlink(src=src, dst=dst) - - -def create_keystore(keystore_path): - if not is_valid_keystore(keystore_path): - print('creating keystore: %s' % keystore_path) - else: - print('keystore already exists: %s' % keystore_path) - return - - os.makedirs(keystore_path, exist_ok=True) - os.makedirs(os.path.join(keystore_path, KS_PUBLIC), exist_ok=True) - os.makedirs(os.path.join(keystore_path, KS_PRIVATE), exist_ok=True) - os.makedirs(os.path.join(keystore_path, KS_CONTEXT), exist_ok=True) - - keystore_ca_cert_path = os.path.join(keystore_path, KS_PUBLIC, 'ca.cert.pem') - keystore_ca_key_path = os.path.join(keystore_path, KS_PRIVATE, 'ca.key.pem') - - keystore_permissions_ca_cert_path = os.path.join( - keystore_path, KS_PUBLIC, 'permissions_ca.cert.pem') - keystore_permissions_ca_key_path = os.path.join( - keystore_path, KS_PRIVATE, 'permissions_ca.key.pem') - keystore_identity_ca_cert_path = os.path.join(keystore_path, KS_PUBLIC, 'identity_ca.cert.pem') - keystore_identity_ca_key_path = os.path.join(keystore_path, KS_PRIVATE, 'identity_ca.key.pem') - - required_files = ( - keystore_permissions_ca_cert_path, - keystore_permissions_ca_key_path, - keystore_identity_ca_cert_path, - keystore_identity_ca_key_path, - ) - - if not all(os.path.isfile(x) for x in required_files): - print('creating new CA key/cert pair') - create_ca_key_cert(keystore_ca_key_path, keystore_ca_cert_path) - _create_symlink(src='ca.cert.pem', dst=keystore_permissions_ca_cert_path) - _create_symlink(src='ca.key.pem', dst=keystore_permissions_ca_key_path) - _create_symlink(src='ca.cert.pem', dst=keystore_identity_ca_cert_path) - _create_symlink(src='ca.key.pem', dst=keystore_identity_ca_key_path) - else: - print('found CA key and cert, not creating new ones!') - - # create governance file - gov_path = os.path.join(keystore_path, KS_CONTEXT, 'governance.xml') - if not os.path.isfile(gov_path): - print('creating governance file: %s' % gov_path) - domain_id = os.getenv(DOMAIN_ID_ENV, '0') - create_governance_file(gov_path, domain_id) - else: - print('found governance file, not creating a new one!') - - # sign governance file - signed_gov_path = os.path.join(keystore_path, KS_CONTEXT, 'governance.p7s') - if not os.path.isfile(signed_gov_path): - print('creating signed governance file: %s' % signed_gov_path) - _create_smime_signed_file( - keystore_permissions_ca_cert_path, - keystore_permissions_ca_key_path, - gov_path, - signed_gov_path) - else: - print('found signed governance file, not creating a new one!') - - print('all done! enjoy your keystore in %s' % keystore_path) - print('cheers!') - return True - - -def is_valid_keystore(path): - return ( - os.path.isfile(os.path.join(path, KS_PUBLIC, 'permissions_ca.cert.pem')) and - os.path.isfile(os.path.join(path, KS_PUBLIC, 'identity_ca.cert.pem')) and - os.path.isfile(os.path.join(path, KS_PRIVATE, 'permissions_ca.key.pem')) and - os.path.isfile(os.path.join(path, KS_PRIVATE, 'identity_ca.key.pem')) and - os.path.isfile(os.path.join(path, KS_CONTEXT, 'governance.p7s')) - ) - - def is_key_name_valid(name): # TODO(ivanpauno): Use validate_security_context_name when it's propagated to `rclpy`. # This is not to bad for the moment. @@ -291,22 +144,23 @@ def create_permission(keystore_path, identity, policy_file_path): def create_permissions_from_policy_element(keystore_path, identity, policy_element): - domain_id = os.getenv(DOMAIN_ID_ENV, '0') relative_path = os.path.normpath(identity.lstrip('/')) - key_dir = os.path.join(keystore_path, KS_CONTEXT, relative_path) + key_dir = os.path.join(_keystore.get_keystore_context_dir(keystore_path), relative_path) print("creating permission file for identity: '%s'" % identity) permissions_path = os.path.join(key_dir, 'permissions.xml') - create_permission_file(permissions_path, domain_id, policy_element) + create_permission_file(permissions_path, _utilities.domain_id(), policy_element) signed_permissions_path = os.path.join(key_dir, 'permissions.p7s') - keystore_ca_cert_path = os.path.join(keystore_path, KS_PUBLIC, 'ca.cert.pem') - keystore_ca_key_path = os.path.join(keystore_path, KS_PRIVATE, 'ca.key.pem') - _create_smime_signed_file( + keystore_ca_cert_path = os.path.join( + _keystore.get_keystore_public_dir(keystore_path), 'ca.cert.pem') + keystore_ca_key_path = os.path.join( + _keystore.get_keystore_private_dir(keystore_path), 'ca.key.pem') + _utilities.create_smime_signed_file( keystore_ca_cert_path, keystore_ca_key_path, permissions_path, signed_permissions_path) def create_key(keystore_path, identity): - if not is_valid_keystore(keystore_path): + if not _keystore.is_valid_keystore(keystore_path): print("'%s' is not a valid keystore " % keystore_path) return False if not is_key_name_valid(identity): @@ -314,25 +168,29 @@ def create_key(keystore_path, identity): print("creating key for identity: '%s'" % identity) relative_path = os.path.normpath(identity.lstrip('/')) - key_dir = os.path.join(keystore_path, KS_CONTEXT, relative_path) + key_dir = os.path.join(_keystore.get_keystore_context_dir(keystore_path), relative_path) os.makedirs(key_dir, exist_ok=True) # symlink the CA cert in there public_certs = ['identity_ca.cert.pem', 'permissions_ca.cert.pem'] for public_cert in public_certs: dst = os.path.join(key_dir, public_cert) - keystore_ca_cert_path = os.path.join(keystore_path, KS_PUBLIC, public_cert) + keystore_ca_cert_path = os.path.join( + _keystore.get_keystore_public_dir(keystore_path), public_cert) relativepath = os.path.relpath(keystore_ca_cert_path, key_dir) - _create_symlink(src=relativepath, dst=dst) + _utilities.create_symlink(src=relativepath, dst=dst) # symlink the governance file in there - keystore_governance_path = os.path.join(keystore_path, KS_CONTEXT, 'governance.p7s') + keystore_governance_path = os.path.join( + _keystore.get_keystore_context_dir(keystore_path), 'governance.p7s') dest_governance_path = os.path.join(key_dir, 'governance.p7s') relativepath = os.path.relpath(keystore_governance_path, key_dir) - _create_symlink(src=relativepath, dst=dest_governance_path) + _utilities.create_symlink(src=relativepath, dst=dest_governance_path) - keystore_identity_ca_cert_path = os.path.join(keystore_path, KS_PUBLIC, 'identity_ca.cert.pem') - keystore_identity_ca_key_path = os.path.join(keystore_path, KS_PRIVATE, 'identity_ca.key.pem') + keystore_identity_ca_cert_path = os.path.join( + _keystore.get_keystore_public_dir(keystore_path), 'identity_ca.cert.pem') + keystore_identity_ca_key_path = os.path.join( + _keystore.get_keystore_private_dir(keystore_path), 'identity_ca.key.pem') cert_path = os.path.join(key_dir, 'cert.pem') key_path = os.path.join(key_dir, 'key.pem') @@ -356,13 +214,12 @@ def create_key(keystore_path, identity): context_element.attrib['path'] = identity permissions_path = os.path.join(key_dir, 'permissions.xml') - domain_id = os.getenv(DOMAIN_ID_ENV, '0') - create_permission_file(permissions_path, domain_id, policy_element) + create_permission_file(permissions_path, _utilities.domain_id(), policy_element) signed_permissions_path = os.path.join(key_dir, 'permissions.p7s') keystore_permissions_ca_key_path = os.path.join( - keystore_path, KS_PRIVATE, 'permissions_ca.key.pem') - _create_smime_signed_file( + _keystore.get_keystore_private_dir(keystore_path), 'permissions_ca.key.pem') + _utilities.create_smime_signed_file( keystore_ca_cert_path, keystore_permissions_ca_key_path, permissions_path, @@ -373,7 +230,7 @@ def create_key(keystore_path, identity): def list_keys(keystore_path): - contexts_path = os.path.join(keystore_path, KS_CONTEXT) + contexts_path = _keystore.get_keystore_context_dir(keystore_path) if not os.path.isdir(keystore_path): raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), keystore_path) if not os.path.isdir(contexts_path): @@ -401,9 +258,9 @@ def generate_artifacts(keystore_path=None, identity_names=[], policy_files=[]): keystore_path = get_keystore_path_from_env() if keystore_path is None: return False - if not is_valid_keystore(keystore_path): + if not _keystore.is_valid_keystore(keystore_path): print('%s is not a valid keystore, creating new keystore' % keystore_path) - create_keystore(keystore_path) + _keystore.create_keystore(keystore_path) # create keys for all provided identities for identity in identity_names: @@ -423,101 +280,6 @@ def generate_artifacts(keystore_path=None, identity_names=[], policy_files=[]): return True -def _sign_bytes(cert, key, byte_string): - # Using two flags here to get the output required: - # - PKCS7_DETACHED: Use cleartext signing - # - PKCS7_TEXT: Set the MIME headers for text/plain - flags = SSLBinding.lib.PKCS7_DETACHED - flags |= SSLBinding.lib.PKCS7_TEXT - - # Convert the byte string into a buffer for SSL - bio_in = SSLBinding.lib.BIO_new_mem_buf(byte_string, len(byte_string)) - try: - pkcs7 = SSLBinding.lib.PKCS7_sign( - cert._x509, key._evp_pkey, SSLBinding.ffi.NULL, bio_in, flags) - finally: - # Free the memory allocated for the buffer - SSLBinding.lib.BIO_free(bio_in) - - # PKCS7_sign consumes the buffer; allocate a new one again to get it into the final document - bio_in = SSLBinding.lib.BIO_new_mem_buf(byte_string, len(byte_string)) - try: - # Allocate a buffer for the output document - bio_out = SSLBinding.lib.BIO_new(SSLBinding.lib.BIO_s_mem()) - try: - # Write the final document out to the buffer - SSLBinding.lib.SMIME_write_PKCS7(bio_out, pkcs7, bio_in, flags) - - # Copy the output document back to python-managed memory - result_buffer = SSLBinding.ffi.new('char**') - buffer_length = SSLBinding.lib.BIO_get_mem_data(bio_out, result_buffer) - output = SSLBinding.ffi.buffer(result_buffer[0], buffer_length)[:] - finally: - # Free the memory required for the output buffer - SSLBinding.lib.BIO_free(bio_out) - finally: - # Free the memory allocated for the input buffer - SSLBinding.lib.BIO_free(bio_in) - - return output - - -def _create_smime_signed_file(cert_path, key_path, unsigned_file_path, signed_file_path): - # Load the CA cert and key from disk - with open(cert_path, 'rb') as cert_file: - cert = x509.load_pem_x509_certificate( - cert_file.read(), cryptography_backend()) - - with open(key_path, 'rb') as key_file: - private_key = serialization.load_pem_private_key( - key_file.read(), None, cryptography_backend()) - - # Get the contents of the unsigned file, which we're about to sign - with open(unsigned_file_path, 'rb') as f: - content = f.read() - - # Sign the contents, and write the result to the appropriate place - with open(signed_file_path, 'wb') as f: - f.write(_sign_bytes(cert, private_key, content)) - - -def _build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name=''): - if not issuer_name: - issuer_name = subject_name - - # DDS-Security section 9.3.1 calls for prime256v1, for which SECP256R1 is an alias - private_key = ec.generate_private_key(ec.SECP256R1, cryptography_backend()) - if not ca_key: - ca_key = private_key - - if ca: - extension = x509.BasicConstraints(ca=True, path_length=1) - else: - extension = x509.BasicConstraints(ca=False, path_length=None) - - utcnow = datetime.datetime.utcnow() - builder = x509.CertificateBuilder( - ).issuer_name( - issuer_name - ).serial_number( - x509.random_serial_number() - ).not_valid_before( - utcnow - ).not_valid_after( - # TODO: This should not be hard-coded - utcnow + datetime.timedelta(days=3650) - ).public_key( - private_key.public_key() - ).subject_name( - subject_name - ).add_extension( - extension, critical=ca - ) - cert = builder.sign(ca_key, hashes.SHA256(), cryptography_backend()) - - return (cert, private_key) - - def _create_key_and_cert( keystore_ca_cert_path, keystore_ca_key_path, identity, cert_path, key_path): # Load the CA cert and key from disk @@ -527,10 +289,10 @@ def _create_key_and_cert( with open(keystore_ca_key_path, 'rb') as f: ca_key = serialization.load_pem_private_key(f.read(), None, cryptography_backend()) - cert, private_key = _build_key_and_cert( + cert, private_key = _utilities.build_key_and_cert( x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, identity)]), issuer_name=ca_cert.subject, ca_key=ca_key) - _write_key(private_key, key_path) - _write_cert(cert, cert_path) + _utilities.write_key(private_key, key_path) + _utilities.write_cert(cert, cert_path) diff --git a/sros2/sros2/api/_keystore.py b/sros2/sros2/api/_keystore.py new file mode 100644 index 00000000..3f75176c --- /dev/null +++ b/sros2/sros2/api/_keystore.py @@ -0,0 +1,150 @@ +# Copyright 2020 Canonical Ltd +# Copyright 2016-2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from cryptography import x509 + +import lxml + +from sros2.policy import get_transport_default, get_transport_schema + +from . import _utilities + + +_KS_CONTEXT = 'contexts' +_KS_PUBLIC = 'public' +_KS_PRIVATE = 'private' +_DEFAULT_COMMON_NAME = 'sros2testCA' + + +def create_keystore(keystore_path): + if not is_valid_keystore(keystore_path): + print('creating keystore: %s' % keystore_path) + else: + print('keystore already exists: %s' % keystore_path) + return + + os.makedirs(keystore_path, exist_ok=True) + os.makedirs(os.path.join(keystore_path, _KS_PUBLIC), exist_ok=True) + os.makedirs(os.path.join(keystore_path, _KS_PRIVATE), exist_ok=True) + os.makedirs(os.path.join(keystore_path, _KS_CONTEXT), exist_ok=True) + + keystore_ca_cert_path = os.path.join(keystore_path, _KS_PUBLIC, 'ca.cert.pem') + keystore_ca_key_path = os.path.join(keystore_path, _KS_PRIVATE, 'ca.key.pem') + + keystore_permissions_ca_cert_path = os.path.join( + keystore_path, _KS_PUBLIC, 'permissions_ca.cert.pem') + keystore_permissions_ca_key_path = os.path.join( + keystore_path, _KS_PRIVATE, 'permissions_ca.key.pem') + keystore_identity_ca_cert_path = os.path.join( + keystore_path, _KS_PUBLIC, 'identity_ca.cert.pem') + keystore_identity_ca_key_path = os.path.join( + keystore_path, _KS_PRIVATE, 'identity_ca.key.pem') + + required_files = ( + keystore_permissions_ca_cert_path, + keystore_permissions_ca_key_path, + keystore_identity_ca_cert_path, + keystore_identity_ca_key_path, + ) + + if not all(os.path.isfile(x) for x in required_files): + print('creating new CA key/cert pair') + _create_ca_key_cert(keystore_ca_key_path, keystore_ca_cert_path) + _utilities.create_symlink(src='ca.cert.pem', dst=keystore_permissions_ca_cert_path) + _utilities.create_symlink(src='ca.key.pem', dst=keystore_permissions_ca_key_path) + _utilities.create_symlink(src='ca.cert.pem', dst=keystore_identity_ca_cert_path) + _utilities.create_symlink(src='ca.key.pem', dst=keystore_identity_ca_key_path) + else: + print('found CA key and cert, not creating new ones!') + + # create governance file + gov_path = os.path.join(keystore_path, _KS_CONTEXT, 'governance.xml') + if not os.path.isfile(gov_path): + print('creating governance file: %s' % gov_path) + _create_governance_file(gov_path, _utilities.domain_id()) + else: + print('found governance file, not creating a new one!') + + # sign governance file + signed_gov_path = os.path.join(keystore_path, _KS_CONTEXT, 'governance.p7s') + if not os.path.isfile(signed_gov_path): + print('creating signed governance file: %s' % signed_gov_path) + _utilities.create_smime_signed_file( + keystore_permissions_ca_cert_path, + keystore_permissions_ca_key_path, + gov_path, + signed_gov_path) + else: + print('found signed governance file, not creating a new one!') + + print('all done! enjoy your keystore in %s' % keystore_path) + print('cheers!') + return True + + +def is_valid_keystore(path): + return ( + os.path.isfile(os.path.join(path, _KS_PUBLIC, 'permissions_ca.cert.pem')) and + os.path.isfile(os.path.join(path, _KS_PUBLIC, 'identity_ca.cert.pem')) and + os.path.isfile(os.path.join(path, _KS_PRIVATE, 'permissions_ca.key.pem')) and + os.path.isfile(os.path.join(path, _KS_PRIVATE, 'identity_ca.key.pem')) and + os.path.isfile(os.path.join(path, _KS_CONTEXT, 'governance.p7s')) + ) + + +def get_keystore_context_dir(keystore_path: str) -> str: + return os.path.join(keystore_path, _KS_CONTEXT) + + +def get_keystore_public_dir(keystore_path: str) -> str: + return os.path.join(keystore_path, _KS_PUBLIC) + + +def get_keystore_private_dir(keystore_path: str) -> str: + return os.path.join(keystore_path, _KS_PRIVATE) + + +def _create_ca_key_cert(ca_key_out_path, ca_cert_out_path): + cert, private_key = _utilities.build_key_and_cert( + x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, _DEFAULT_COMMON_NAME)]), + ca=True) + + _utilities.write_key(private_key, ca_key_out_path) + _utilities.write_cert(cert, ca_cert_out_path) + + +def _create_governance_file(path, domain_id): + # for this application we are only looking to authenticate and encrypt; + # we do not need/want access control at this point. + governance_xml_path = get_transport_default('dds', 'governance.xml') + governance_xml = lxml.etree.parse(governance_xml_path) + + governance_xsd_path = get_transport_schema('dds', 'governance.xsd') + governance_xsd = lxml.etree.XMLSchema(lxml.etree.parse(governance_xsd_path)) + + domain_id_elements = governance_xml.findall( + 'domain_access_rules/domain_rule/domains/id') + for domain_id_element in domain_id_elements: + domain_id_element.text = domain_id + + try: + governance_xsd.assertValid(governance_xml) + except lxml.etree.DocumentInvalid as e: + raise RuntimeError(str(e)) + + with open(path, 'wb') as f: + f.write(lxml.etree.tostring(governance_xml, pretty_print=True)) diff --git a/sros2/sros2/api/_utilities.py b/sros2/sros2/api/_utilities.py new file mode 100644 index 00000000..adee18b1 --- /dev/null +++ b/sros2/sros2/api/_utilities.py @@ -0,0 +1,155 @@ +# Copyright 2020 Canonical Ltd +# Copyright 2016-2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import os + +from cryptography import x509 +from cryptography.hazmat.backends import default_backend as cryptography_backend +from cryptography.hazmat.bindings.openssl.binding import Binding as SSLBinding +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec + +_DOMAIN_ID_ENV = 'ROS_DOMAIN_ID' + + +def create_symlink(*, src, dst): + if os.path.exists(dst): + src_abs_path = os.path.join(os.path.dirname(dst), src) + if os.path.samefile(src_abs_path, dst): + return + print(f"Existing symlink '{dst}' does not match '{src_abs_path}', overriding it!") + os.remove(dst) + os.symlink(src=src, dst=dst) + + +def domain_id() -> str: + return os.getenv(_DOMAIN_ID_ENV, '0') + + +def create_smime_signed_file(cert_path, key_path, unsigned_file_path, signed_file_path): + # Load the CA cert and key from disk + with open(cert_path, 'rb') as cert_file: + cert = x509.load_pem_x509_certificate( + cert_file.read(), cryptography_backend()) + + with open(key_path, 'rb') as key_file: + private_key = serialization.load_pem_private_key( + key_file.read(), None, cryptography_backend()) + + # Get the contents of the unsigned file, which we're about to sign + with open(unsigned_file_path, 'rb') as f: + content = f.read() + + # Sign the contents, and write the result to the appropriate place + with open(signed_file_path, 'wb') as f: + f.write(_sign_bytes(cert, private_key, content)) + + +def build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name=''): + if not issuer_name: + issuer_name = subject_name + + # DDS-Security section 9.3.1 calls for prime256v1, for which SECP256R1 is an alias + private_key = ec.generate_private_key(ec.SECP256R1, cryptography_backend()) + if not ca_key: + ca_key = private_key + + if ca: + extension = x509.BasicConstraints(ca=True, path_length=1) + else: + extension = x509.BasicConstraints(ca=False, path_length=None) + + utcnow = datetime.datetime.utcnow() + builder = x509.CertificateBuilder( + ).issuer_name( + issuer_name + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + utcnow + ).not_valid_after( + # TODO: This should not be hard-coded + utcnow + datetime.timedelta(days=3650) + ).public_key( + private_key.public_key() + ).subject_name( + subject_name + ).add_extension( + extension, critical=ca + ) + cert = builder.sign(ca_key, hashes.SHA256(), cryptography_backend()) + + return (cert, private_key) + + +def write_key( + key, + key_path, + *, + encoding=serialization.Encoding.PEM, + serialization_format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() +): + with open(key_path, 'wb') as f: + f.write(key.private_bytes( + encoding=encoding, + format=serialization_format, + encryption_algorithm=encryption_algorithm)) + + +def write_cert(cert, cert_path, *, encoding=serialization.Encoding.PEM): + with open(cert_path, 'wb') as f: + f.write(cert.public_bytes(encoding=encoding)) + + +def _sign_bytes(cert, key, byte_string): + # Using two flags here to get the output required: + # - PKCS7_DETACHED: Use cleartext signing + # - PKCS7_TEXT: Set the MIME headers for text/plain + flags = SSLBinding.lib.PKCS7_DETACHED + flags |= SSLBinding.lib.PKCS7_TEXT + + # Convert the byte string into a buffer for SSL + bio_in = SSLBinding.lib.BIO_new_mem_buf(byte_string, len(byte_string)) + try: + pkcs7 = SSLBinding.lib.PKCS7_sign( + cert._x509, key._evp_pkey, SSLBinding.ffi.NULL, bio_in, flags) + finally: + # Free the memory allocated for the buffer + SSLBinding.lib.BIO_free(bio_in) + + # PKCS7_sign consumes the buffer; allocate a new one again to get it into the final document + bio_in = SSLBinding.lib.BIO_new_mem_buf(byte_string, len(byte_string)) + try: + # Allocate a buffer for the output document + bio_out = SSLBinding.lib.BIO_new(SSLBinding.lib.BIO_s_mem()) + try: + # Write the final document out to the buffer + SSLBinding.lib.SMIME_write_PKCS7(bio_out, pkcs7, bio_in, flags) + + # Copy the output document back to python-managed memory + result_buffer = SSLBinding.ffi.new('char**') + buffer_length = SSLBinding.lib.BIO_get_mem_data(bio_out, result_buffer) + output = SSLBinding.ffi.buffer(result_buffer[0], buffer_length)[:] + finally: + # Free the memory required for the output buffer + SSLBinding.lib.BIO_free(bio_out) + finally: + # Free the memory allocated for the input buffer + SSLBinding.lib.BIO_free(bio_in) + + return output diff --git a/sros2/sros2/verb/create_keystore.py b/sros2/sros2/verb/create_keystore.py index 64dab156..2a47070d 100644 --- a/sros2/sros2/verb/create_keystore.py +++ b/sros2/sros2/verb/create_keystore.py @@ -18,7 +18,7 @@ def DirectoriesCompleter(): return None -from sros2.api import create_keystore +from sros2.api import _keystore from sros2.verb import VerbExtension @@ -30,5 +30,5 @@ def add_arguments(self, parser, cli_name): arg.completer = DirectoriesCompleter() def main(self, *, args): - success = create_keystore(args.ROOT) + success = _keystore.create_keystore(args.ROOT) return 0 if success else 1 diff --git a/sros2/test/sros2/commands/security/verbs/test_create_key.py b/sros2/test/sros2/commands/security/verbs/test_create_key.py index 95c54395..c93585fa 100644 --- a/sros2/test/sros2/commands/security/verbs/test_create_key.py +++ b/sros2/test/sros2/commands/security/verbs/test_create_key.py @@ -27,8 +27,7 @@ from ros2cli import cli -from sros2.api import _DEFAULT_COMMON_NAME -from sros2.api import create_keystore +from sros2.api import _keystore from sros2.policy import get_transport_schema @@ -38,7 +37,7 @@ def security_context_keys_dir(tmpdir_factory): keystore_dir = Path(str(tmpdir_factory.mktemp('keystore'))) # First, create the keystore - assert create_keystore(keystore_dir) + assert _keystore.create_keystore(keystore_dir) # Now using that keystore, create a keypair along with other files required by DDS assert cli.main( @@ -104,7 +103,7 @@ def test_create_key(security_context_keys_dir): def test_cert_pem(security_context_keys_dir): cert = load_cert(security_context_keys_dir / 'cert.pem') check_common_name(cert.subject, u'/test_security_context') - check_common_name(cert.issuer, _DEFAULT_COMMON_NAME) + check_common_name(cert.issuer, _keystore._DEFAULT_COMMON_NAME) # Verify that the hash algorithm is as expected assert isinstance(cert.signature_hash_algorithm, hashes.SHA256) @@ -140,8 +139,8 @@ def test_governance_p7s(security_context_keys_dir): def test_identity_ca_cert_pem(security_context_keys_dir): cert = load_cert(security_context_keys_dir / 'identity_ca.cert.pem') - check_common_name(cert.subject, _DEFAULT_COMMON_NAME) - check_common_name(cert.issuer, _DEFAULT_COMMON_NAME) + check_common_name(cert.subject, _keystore._DEFAULT_COMMON_NAME) + check_common_name(cert.issuer, _keystore._DEFAULT_COMMON_NAME) def test_key_pem(security_context_keys_dir): @@ -173,8 +172,8 @@ def test_permissions_xml(security_context_keys_dir): def test_permissions_ca_cert_pem(security_context_keys_dir): cert = load_cert(security_context_keys_dir / 'permissions_ca.cert.pem') - check_common_name(cert.subject, _DEFAULT_COMMON_NAME) - check_common_name(cert.issuer, _DEFAULT_COMMON_NAME) + check_common_name(cert.subject, _keystore._DEFAULT_COMMON_NAME) + check_common_name(cert.issuer, _keystore._DEFAULT_COMMON_NAME) signatory = load_cert(security_context_keys_dir / 'identity_ca.cert.pem') assert verify_signature(cert, signatory) diff --git a/sros2/test/sros2/commands/security/verbs/test_create_keystore.py b/sros2/test/sros2/commands/security/verbs/test_create_keystore.py index 1874b8be..178ed79f 100644 --- a/sros2/test/sros2/commands/security/verbs/test_create_keystore.py +++ b/sros2/test/sros2/commands/security/verbs/test_create_keystore.py @@ -22,7 +22,7 @@ import pytest from ros2cli import cli -from sros2.api import _DEFAULT_COMMON_NAME +from sros2.api import _keystore # This fixture will run once for the entire module (as opposed to once per test) @@ -69,7 +69,7 @@ def test_ca_cert(keystore_dir): cert = x509.load_pem_x509_certificate(f.read(), cryptography_backend()) names = cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME) assert len(names) == 1 - assert names[0].value == _DEFAULT_COMMON_NAME + assert names[0].value == _keystore._DEFAULT_COMMON_NAME names = cert.subject.get_attributes_for_oid(x509.oid.NameOID.ORGANIZATION_NAME) assert len(names) == 0 diff --git a/sros2/test/sros2/commands/security/verbs/test_list_keys.py b/sros2/test/sros2/commands/security/verbs/test_list_keys.py index 0192c4bf..176646b0 100644 --- a/sros2/test/sros2/commands/security/verbs/test_list_keys.py +++ b/sros2/test/sros2/commands/security/verbs/test_list_keys.py @@ -16,14 +16,14 @@ import tempfile from ros2cli import cli -from sros2.api import create_key, create_keystore +from sros2.api import _keystore, create_key def test_list_keys(capsys): with tempfile.TemporaryDirectory() as keystore_dir: with capsys.disabled(): # First, create the keystore - assert create_keystore(keystore_dir) + assert _keystore.create_keystore(keystore_dir) # Now using that keystore, create a keypair assert create_key(keystore_dir, '/test_context') @@ -37,7 +37,7 @@ def test_list_keys_no_keys(capsys): with tempfile.TemporaryDirectory() as keystore_dir: with capsys.disabled(): # First, create the keystore - assert create_keystore(keystore_dir) + assert _keystore.create_keystore(keystore_dir) # Now verify that empty keystore we just created contains no keys assert cli.main(argv=['security', 'list_keys', keystore_dir]) == 0