diff --git a/sros2/sros2/api/__init__.py b/sros2/sros2/api/__init__.py index 5796736a..6492aaaf 100644 --- a/sros2/sros2/api/__init__.py +++ b/sros2/sros2/api/__init__.py @@ -14,13 +14,9 @@ from collections import namedtuple import datetime -import itertools import os -import platform import shutil -import subprocess import sys -import textwrap from cryptography import x509 from cryptography.hazmat.backends import default_backend as cryptography_backend @@ -89,44 +85,6 @@ def get_service_info(node, node_name): return get_topics(node_name, node.get_service_names_and_types_by_node) -def find_openssl_executable(): - if platform.system() != 'Darwin': - return 'openssl' - - brew_openssl_prefix_result = subprocess.run( - ['brew', '--prefix', 'openssl'], - stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - if brew_openssl_prefix_result.returncode: - raise RuntimeError('unable to find openssl from brew') - basepath = brew_openssl_prefix_result.stdout.decode().rstrip() - return os.path.join(basepath, 'bin', 'openssl') - - -def check_openssl_version(openssl_executable): - openssl_version_string_result = subprocess.run( - [openssl_executable, 'version'], - stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - if openssl_version_string_result.returncode: - raise RuntimeError('unable to invoke command: "%s"' % openssl_executable) - version = openssl_version_string_result.stdout.decode().rstrip() - openssl_version_string_list = version.split(' ') - if openssl_version_string_list[0].lower() != 'openssl': - raise RuntimeError( - "expected version of the format 'OpenSSL " - ".. '") - (major, minor, patch) = openssl_version_string_list[1].split('.') - major = int(major) - minor = int(minor) - if major < 1: - raise RuntimeError('need openssl 1.0.2 minimum') - if major == 1 and minor < 0: - raise RuntimeError('need openssl 1.0.2 minimum') - if major == 1 and minor == 0 and int(''.join(itertools.takewhile(str.isdigit, patch))) < 2: - raise RuntimeError('need openssl 1.0.2 minimum') - - def _write_key( key, key_path, @@ -147,103 +105,12 @@ def _write_cert(cert, cert_path, *, encoding=serialization.Encoding.PEM): f.write(cert.public_bytes(encoding=encoding)) -def create_ca_conf_file(path): - conf_string = textwrap.dedent("""\ - [ ca ] - default_ca = CA_default - - [ CA_default ] - dir = . - certs = $dir/certs - crl_dir = $dir/crl - database = $dir/index.txt - unique_subject = no - new_certs_dir = $dir - certificate = $dir/ca.cert.pem - private_key = $dir/ca.key.pem - serial = $dir/serial - crlnumber = $dir/crlnumber - crl = $dir/crl.pem - RANDFILE = $dir/private/.rand - name_opt = ca_default - cert_opt = ca_default - default_days = 1825 - default_crl_days = 30 - default_md = sha256 - preserve = no - policy = policy_match - x509_extensions = local_ca_extensions - # - # - # Copy extensions specified in the certificate request - # - copy_extensions = copy - - [ policy_match ] - countryName = optional - stateOrProvinceName = optional - organizationName = optional - organizationalUnitName = optional - commonName = supplied - emailAddress = optional - - # - # - # x509 extensions to use when generating server certificates. - # - [ local_ca_extensions ] - basicConstraints = CA:false - - [ req ] - prompt = no - distinguished_name = req_distinguished_name - string_mask = utf8only - x509_extensions = root_ca_extensions - - [ req_distinguished_name ] - commonName = {common_name} - - [ root_ca_extensions ] - basicConstraints = CA:true - """.format(common_name=_DEFAULT_COMMON_NAME)) - with open(path, 'w') as f: - f.write(conf_string) - - -def run_shell_command(cmd, in_path=None): - print('running command in path [%s]: %s' % (in_path, cmd)) - subprocess.call(cmd, shell=True, cwd=in_path) - - -def create_ecdsa_param_file(path): - openssl_executable = find_openssl_executable() - check_openssl_version(openssl_executable) - run_shell_command('%s ecparam -name prime256v1 > %s' % (openssl_executable, path)) - - def create_ca_key_cert(ca_key_out_path, ca_cert_out_path): - # DDS-Security 9.3.1 calls for prime256v1 - SECP256R1 is another alias for that - private_key = ec.generate_private_key(ec.SECP256R1, cryptography_backend()) - _write_key(private_key, ca_key_out_path) + cert, private_key = _build_key_and_cert( + x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, _DEFAULT_COMMON_NAME)]), + ca=True) - common_name = x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, _DEFAULT_COMMON_NAME) - builder = x509.CertificateBuilder( - ).issuer_name( - x509.Name([common_name]) - ).serial_number( - x509.random_serial_number() - ).not_valid_before( - datetime.datetime.today() - datetime.timedelta(days=1) - ).not_valid_after( - datetime.datetime.today() + datetime.timedelta(days=3650) - ).public_key( - private_key.public_key() - ).subject_name( - x509.Name([common_name]) - ).add_extension( - x509.BasicConstraints(ca=True, path_length=1), critical=True - ) - cert = builder.sign(private_key, hashes.SHA256(), cryptography_backend()) + _write_key(private_key, ca_key_out_path) _write_cert(cert, ca_cert_out_path) @@ -277,16 +144,9 @@ def create_keystore(keystore_path): else: print('directory already exists: %s' % keystore_path) - ca_conf_path = os.path.join(keystore_path, 'ca_conf.cnf') ca_key_path = os.path.join(keystore_path, 'ca.key.pem') ca_cert_path = os.path.join(keystore_path, 'ca.cert.pem') - if not os.path.isfile(ca_conf_path): - print('creating CA file: %s' % ca_conf_path) - create_ca_conf_file(ca_conf_path) - else: - print('found CA conf file, not writing a new one!') - if not (os.path.isfile(ca_key_path) and os.path.isfile(ca_cert_path)): print('creating new CA key/cert pair') create_ca_key_cert(ca_key_path, ca_cert_path) @@ -310,30 +170,17 @@ def create_keystore(keystore_path): else: print('found signed governance file, not creating a new one!') - # create index file - index_path = os.path.join(keystore_path, 'index.txt') - if not os.path.isfile(index_path): - with open(index_path, 'a'): - pass - - # create serial file - serial_path = os.path.join(keystore_path, 'serial') - if not os.path.isfile(serial_path): - with open(serial_path, 'w') as f: - f.write('1000') - print('all done! enjoy your keystore in %s' % keystore_path) print('cheers!') return True def is_valid_keystore(path): - res = os.path.isfile(os.path.join(path, 'ca_conf.cnf')) - res &= os.path.isfile(os.path.join(path, 'index.txt')) - res &= os.path.isfile(os.path.join(path, 'ca.key.pem')) - res &= os.path.isfile(os.path.join(path, 'ca.cert.pem')) - res &= os.path.isfile(os.path.join(path, 'governance.p7s')) - return res + return ( + os.path.isfile(os.path.join(path, 'ca.key.pem')) and + os.path.isfile(os.path.join(path, 'ca.cert.pem')) and + os.path.isfile(os.path.join(path, 'governance.p7s')) + ) def is_key_name_valid(name): @@ -351,41 +198,6 @@ def is_key_name_valid(name): return False -def create_request_file(path, name): - with open(path, 'w') as f: - f.write("""\ -prompt = no -string_mask = utf8only -distinguished_name = req_distinguished_name - -[ req_distinguished_name ] -commonName = %s -""" % name) - - -def create_key_and_cert_req(root, relative_path, cnf_path, ecdsa_param_path, key_path, req_path): - key_relpath = os.path.join(relative_path, 'key.pem') - ecdsa_param_relpath = os.path.join(relative_path, 'ecdsaparam') - cnf_relpath = os.path.join(relative_path, 'request.cnf') - key_relpath = os.path.join(relative_path, 'key.pem') - req_relpath = os.path.join(relative_path, 'req.pem') - openssl_executable = find_openssl_executable() - check_openssl_version(openssl_executable) - run_shell_command( - '%s req -nodes -new -newkey ec:%s -config %s -keyout %s -out %s' % - (openssl_executable, ecdsa_param_relpath, cnf_relpath, key_relpath, req_relpath), root) - - -def create_cert(root_path, relative_path): - req_relpath = os.path.join(relative_path, 'req.pem') - cert_relpath = os.path.join(relative_path, 'cert.pem') - openssl_executable = find_openssl_executable() - check_openssl_version(openssl_executable) - run_shell_command( - '%s ca -batch -create_serial -config ca_conf.cnf -days 3650 -in %s -out %s' % - (openssl_executable, req_relpath, cert_relpath), root_path) - - def create_permission_file(path, domain_id, policy_element): permissions_xsl_path = get_transport_template('dds', 'permissions.xsl') permissions_xsl = etree.XSLT(etree.parse(permissions_xsl_path)) @@ -464,6 +276,7 @@ def create_key(keystore_path, identity): os.makedirs(key_dir, exist_ok=True) # copy the CA cert in there + keystore_ca_key_path = os.path.join(keystore_path, 'ca.key.pem') keystore_ca_cert_path = os.path.join(keystore_path, 'ca.cert.pem') dest_identity_ca_cert_path = os.path.join(key_dir, 'identity_ca.cert.pem') dest_permissions_ca_cert_path = os.path.join(key_dir, 'permissions_ca.cert.pem') @@ -475,38 +288,14 @@ def create_key(keystore_path, identity): dest_governance_path = os.path.join(key_dir, 'governance.p7s') shutil.copyfile(keystore_governance_path, dest_governance_path) - ecdsa_param_path = os.path.join(key_dir, 'ecdsaparam') - if not os.path.isfile(ecdsa_param_path): - print('creating ECDSA param file: %s' % ecdsa_param_path) - create_ecdsa_param_file(ecdsa_param_path) - else: - print('found ECDSA param file, not writing a new one!') - - cnf_path = os.path.join(key_dir, 'request.cnf') - if not os.path.isfile(cnf_path): - create_request_file(cnf_path, identity) - else: - print('config file exists, not creating a new one: %s' % cnf_path) - - key_path = os.path.join(key_dir, 'key.pem') - req_path = os.path.join(key_dir, 'req.pem') - if not os.path.isfile(key_path) or not os.path.isfile(req_path): - print('creating key and cert request') - create_key_and_cert_req( - keystore_path, - relative_path, - cnf_path, - ecdsa_param_path, - key_path, req_path) - else: - print('found key and cert req; not creating new ones!') - cert_path = os.path.join(key_dir, 'cert.pem') - if not os.path.isfile(cert_path): - print('creating cert') - create_cert(keystore_path, relative_path) + key_path = os.path.join(key_dir, 'key.pem') + if not os.path.isfile(cert_path) or not os.path.isfile(key_path): + print('creating cert and key') + _create_key_and_cert( + keystore_ca_cert_path, keystore_ca_key_path, identity, cert_path, key_path) else: - print('found cert; not creating a new one!') + print('found cert and key; not creating new ones!') # create a wildcard permissions file for this node which can be overridden # later using a policy if desired @@ -631,3 +420,58 @@ def _create_smime_signed_file(cert_path, key_path, unsigned_file_path, signed_fi # Sign the contents, and write the result to the appropriate place with open(signed_file_path, 'wb') as f: f.write(_sign_bytes(cert, private_key, content)) + + +def _build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name=''): + if not issuer_name: + issuer_name = subject_name + + # DDS-Security section 9.3.1 calls for prime256v1, for which SECP256R1 is an alias + private_key = ec.generate_private_key(ec.SECP256R1, cryptography_backend()) + if not ca_key: + ca_key = private_key + + if ca: + extension = x509.BasicConstraints(ca=True, path_length=1) + else: + extension = x509.BasicConstraints(ca=False, path_length=None) + + now = datetime.datetime.now() + builder = x509.CertificateBuilder( + ).issuer_name( + issuer_name + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + now + ).not_valid_after( + # TODO: This should not be hard-coded + now + datetime.timedelta(days=3650) + ).public_key( + private_key.public_key() + ).subject_name( + subject_name + ).add_extension( + extension, critical=ca + ) + cert = builder.sign(ca_key, hashes.SHA256(), cryptography_backend()) + + return (cert, private_key) + + +def _create_key_and_cert( + keystore_ca_cert_path, keystore_ca_key_path, identity, cert_path, key_path): + # Load the CA cert and key from disk + with open(keystore_ca_cert_path, 'rb') as f: + ca_cert = x509.load_pem_x509_certificate(f.read(), cryptography_backend()) + + with open(keystore_ca_key_path, 'rb') as f: + ca_key = serialization.load_pem_private_key(f.read(), None, cryptography_backend()) + + cert, private_key = _build_key_and_cert( + x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, identity)]), + issuer_name=ca_cert.subject, + ca_key=ca_key) + + _write_key(private_key, key_path) + _write_cert(cert, cert_path) diff --git a/sros2/test/sros2/commands/security/verbs/test_create_key.py b/sros2/test/sros2/commands/security/verbs/test_create_key.py index e783654c..88df6aff 100644 --- a/sros2/test/sros2/commands/security/verbs/test_create_key.py +++ b/sros2/test/sros2/commands/security/verbs/test_create_key.py @@ -12,15 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import configparser +import datetime import os -import textwrap from xml.etree import ElementTree import cryptography from cryptography import x509 from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec import pytest @@ -70,6 +69,11 @@ def check_common_name(entity, expected_value): assert names[0].value == expected_value +def _datetimes_are_close(actual, expected): + # We can't check exact times, but an hour's resolution is fine for testing purposes + return actual <= expected and actual >= (expected - datetime.timedelta(hours=1)) + + def verify_signature(cert, signatory): try: signatory.public_key().verify( @@ -83,8 +87,8 @@ def verify_signature(cert, signatory): def test_create_key(node_keys_dir): expected_files = ( - 'cert.pem', 'ecdsaparam', 'governance.p7s', 'identity_ca.cert.pem', 'key.pem', - 'permissions.p7s', 'permissions.xml', 'permissions_ca.cert.pem', 'req.pem', 'request.cnf' + 'cert.pem', 'governance.p7s', 'identity_ca.cert.pem', 'key.pem', 'permissions.p7s', + 'permissions.xml', 'permissions_ca.cert.pem' ) assert len(os.listdir(node_keys_dir)) == len(expected_files) @@ -97,17 +101,26 @@ def test_cert_pem(node_keys_dir): check_common_name(cert.subject, u'/test_node') check_common_name(cert.issuer, u'sros2testCA') - signatory = load_cert(os.path.join(node_keys_dir, 'identity_ca.cert.pem')) - assert verify_signature(cert, signatory) + # Verify that the hash algorithm is as expected + assert isinstance(cert.signature_hash_algorithm, hashes.SHA256) + # Verify the cert is valid for the expected timespan + now = datetime.datetime.now() + assert _datetimes_are_close(cert.not_valid_before, now) + assert _datetimes_are_close(cert.not_valid_after, now + datetime.timedelta(days=3650)) -def test_ecdsaparam(node_keys_dir): - with open(os.path.join(node_keys_dir, 'ecdsaparam')) as f: - assert f.read() == textwrap.dedent("""\ - -----BEGIN EC PARAMETERS----- - BggqhkjOPQMBBw== - -----END EC PARAMETERS----- - """) + # Verify that the cert ensures this key cannot be used to sign others as a CA + assert len(cert.extensions) == 1 + extension = cert.extensions[0] + assert extension.critical is False + value = extension.value + assert isinstance(value, x509.BasicConstraints) + assert value.ca is False + assert value.path_length is None + + # Verify this cert is indeed signed by the keystore CA + signatory = load_cert(os.path.join(node_keys_dir, 'identity_ca.cert.pem')) + assert verify_signature(cert, signatory) def test_governance_p7s(node_keys_dir): @@ -128,8 +141,12 @@ def test_identity_ca_cert_pem(node_keys_dir): def test_key_pem(node_keys_dir): private_key = load_private_key(os.path.join(node_keys_dir, 'key.pem')) + assert isinstance(private_key, ec.EllipticCurvePrivateKey) + assert private_key.key_size == 256 + public_key = private_key.public_key() assert isinstance(public_key.curve, ec.SECP256R1) + assert public_key.key_size == 256 def test_permissions_p7s(node_keys_dir): @@ -153,32 +170,3 @@ def test_permissions_ca_cert_pem(node_keys_dir): signatory = load_cert(os.path.join(node_keys_dir, 'identity_ca.cert.pem')) assert verify_signature(cert, signatory) - - -def test_req_pem(node_keys_dir): - csr = load_csr(os.path.join(node_keys_dir, 'req.pem')) - check_common_name(csr.subject, u'/test_node') - - -def test_request_cnf(node_keys_dir): - config = configparser.ConfigParser() - - # ConfigParser doesn't support INI files without section headers, so pretend one - # is there - with open(os.path.join(node_keys_dir, 'request.cnf')) as f: - config.read_string('[root]\n' + f.read()) - - for expected_section in ('root', ' req_distinguished_name '): - assert expected_section in config.sections() - - root_config = config['root'] - for expected_root_key in ('prompt', 'string_mask', 'distinguished_name'): - assert expected_root_key in root_config - - assert root_config['prompt'] == 'no' - assert root_config['string_mask'] == 'utf8only' - assert root_config['distinguished_name'] == 'req_distinguished_name' - - req_config = config[' req_distinguished_name '] - assert 'commonName' in req_config - assert req_config['commonName'] == '/test_node' diff --git a/sros2/test/sros2/commands/security/verbs/test_create_keystore.py b/sros2/test/sros2/commands/security/verbs/test_create_keystore.py index 9b160e14..0c1cb99d 100644 --- a/sros2/test/sros2/commands/security/verbs/test_create_keystore.py +++ b/sros2/test/sros2/commands/security/verbs/test_create_keystore.py @@ -12,78 +12,68 @@ # See the License for the specific language governing permissions and # limitations under the License. -import configparser import os -import tempfile from xml.etree import ElementTree from cryptography import x509 from cryptography.hazmat.backends import default_backend as cryptography_backend from cryptography.hazmat.primitives.serialization import load_pem_private_key +import pytest + from ros2cli import cli +from sros2.api import _DEFAULT_COMMON_NAME + + +# This fixture will run once for the entire module (as opposed to once per test) +@pytest.fixture(scope='module') +def keystore_dir(tmp_path_factory): + keystore_dir = str(tmp_path_factory.mktemp('keystore')) + + # Create the keystore + assert cli.main(argv=['security', 'create_keystore', keystore_dir]) == 0 + + # Return path to keystore directory + return keystore_dir + + +def test_create_keystore(keystore_dir): + expected_files = ( + 'ca.cert.pem', 'ca.key.pem', 'governance.p7s', 'governance.xml' + ) + assert len(os.listdir(keystore_dir)) == len(expected_files) + + for expected_file in expected_files: + assert os.path.isfile(os.path.join(keystore_dir, expected_file)) + + +def test_ca_cert(keystore_dir): + with open(os.path.join(keystore_dir, 'ca.cert.pem'), 'rb') as f: + cert = x509.load_pem_x509_certificate(f.read(), cryptography_backend()) + names = cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME) + assert len(names) == 1 + assert names[0].value == _DEFAULT_COMMON_NAME + names = cert.subject.get_attributes_for_oid(x509.oid.NameOID.ORGANIZATION_NAME) + assert len(names) == 0 + + +def test_ca_key(keystore_dir): + with open(os.path.join(keystore_dir, 'ca.key.pem'), 'rb') as f: + key = load_pem_private_key(f.read(), password=None, backend=cryptography_backend()) + public = key.public_key() + assert public.curve.name == 'secp256r1' + + +def test_governance_p7s(keystore_dir): + # Would really like to verify the signature, but ffi just can't use + # that part of the OpenSSL API + with open(os.path.join(keystore_dir, 'governance.p7s')) as f: + lines = f.readlines() + assert lines[0] == 'MIME-Version: 1.0\n' + assert lines[1].startswith( + 'Content-Type: multipart/signed; protocol="application/x-pkcs7-signature"; micalg="sha-256";') # noqa -def test_create_keystore(): - def check_index_txt(path): - with open(path, 'r') as f: - lines = f.readlines() - assert len(lines) == 0 - - def check_ca_cert_pem(path): - with open(path, 'rb') as f: - cert = x509.load_pem_x509_certificate(f.read(), cryptography_backend()) - names = cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME) - assert len(names) == 1 - assert names[0].value == u'sros2testCA' - names = cert.subject.get_attributes_for_oid(x509.oid.NameOID.ORGANIZATION_NAME) - assert len(names) == 0 - - def check_ca_conf(path): - config = configparser.ConfigParser() - successful_reads = config.read(path) - assert len(successful_reads) == 1 - assert config.sections() == [ - ' ca ', - ' CA_default ', - ' policy_match ', - ' local_ca_extensions ', - ' req ', - ' req_distinguished_name ', - ' root_ca_extensions ', - ] - - def check_governance_xml(path): - # validates valid XML - ElementTree.parse(path) - - def check_ca_key_pem(path): - with open(path, 'rb') as f: - key = load_pem_private_key(f.read(), password=None, backend=cryptography_backend()) - public = key.public_key() - assert public.curve.name == 'secp256r1' - - def check_governance_p7s(path): - # Would really like to verify the signature, but ffi just can't use - # that part of the OpenSSL API - with open(path, 'r') as f: - lines = f.readlines() - assert lines[0] == 'MIME-Version: 1.0\n' - - with tempfile.TemporaryDirectory() as keystore_dir: - assert cli.main(argv=['security', 'create_keystore', keystore_dir]) == 0 - expected_files = ( - ('governance.p7s', check_governance_p7s), - ('index.txt', check_index_txt), - ('ca.cert.pem', check_ca_cert_pem), - ('ca_conf.cnf', check_ca_conf), - ('governance.xml', check_governance_xml), - ('ca.key.pem', check_ca_key_pem), - ('serial', None), - ) - - for expected_file, file_validator in expected_files: - path = os.path.join(keystore_dir, expected_file) - assert os.path.isfile(path), 'Expected output file %s was not found.' % expected_file - if file_validator: - file_validator(path) +def test_governance_xml(keystore_dir): + # Validates valid XML + ElementTree.parse(os.path.join(keystore_dir, 'governance.xml'))