From 620e9ded364e5b2f6a2a50deff159f33dea3f210 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Fri, 23 Dec 2022 18:11:31 +0100 Subject: [PATCH] CDS/CDNSKEY utilities (#872) Add CDS and CDNSKEY utilities: make_cdnskey() make_cds() make_ds_rdataset() cds_rdataset_to_ds_rdataset() dnskey_rdataset_to_cds_rdataset() dnskey_rdataset_to_cdnskey_rdataset() --- dns/dnssec.py | 250 +++++++++++++++++++++++++++++++++++++++++-- tests/test_dnssec.py | 110 +++++++++++++++++++ 2 files changed, 349 insertions(+), 11 deletions(-) diff --git a/dns/dnssec.py b/dns/dnssec.py index 4cfb75e3b..3589b1f14 100644 --- a/dns/dnssec.py +++ b/dns/dnssec.py @@ -17,7 +17,7 @@ """Common DNSSEC-related functions and constants.""" -from typing import Any, cast, Dict, List, Optional, Tuple, Union +from typing import Any, cast, Dict, List, Optional, Set, Tuple, Union import hashlib import math @@ -36,6 +36,8 @@ import dns.rdatatype import dns.rdataclass import dns.rrset +from dns.rdtypes.ANY.CDNSKEY import CDNSKEY +from dns.rdtypes.ANY.CDS import CDS from dns.rdtypes.ANY.DNSKEY import DNSKEY from dns.rdtypes.ANY.DS import DS from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime @@ -109,7 +111,7 @@ def to_timestamp(value: Union[datetime, str, float, int]) -> int: raise TypeError("Unsupported timestamp type") -def key_id(key: DNSKEY) -> int: +def key_id(key: Union[DNSKEY,CDNSKEY]) -> int: """Return the key id (a 16-bit number) for the specified key. *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` @@ -193,7 +195,7 @@ def make_ds( *name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record. - *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY``, the key the DS is about. + *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``, the key the DS is about. *algorithm*, a ``str`` or ``int`` specifying the hash algorithm. The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case @@ -230,8 +232,8 @@ def make_ds( check = policy.ok_to_create_ds if not check(algorithm): raise DeniedByPolicy - if not isinstance(key, DNSKEY): - raise ValueError("key is not a DNSKEY") + if not isinstance(key, (DNSKEY, CDNSKEY)): + raise ValueError("key is not a DNSKEY/CDNSKEY") if algorithm == DSDigest.SHA1: dshash = hashlib.sha1() elif algorithm == DSDigest.SHA256: @@ -256,6 +258,41 @@ def make_ds( return cast(DS, ds) +def make_cds( + name: Union[dns.name.Name, str], + key: dns.rdata.Rdata, + algorithm: Union[DSDigest, str], + origin: Optional[dns.name.Name] = None, +) -> CDS: + """Create a CDS record for a DNSSEC key. + + *name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record. + + *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``, key the DS is about. + + *algorithm*, a ``str`` or ``int`` specifying the hash algorithm. + The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case + does not matter for these strings. + + *origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name, + then it will be made absolute using the specified origin. + + Raises ``UnsupportedAlgorithm`` if the algorithm is unknown. + + Returns a ``dns.rdtypes.ANY.DS.CDS`` + """ + + ds = make_ds(name, key, algorithm, origin) + return CDS( + rdclass=ds.rdclass, + rdtype=dns.rdatatype.CDS, + key_tag=ds.key_tag, + algorithm=ds.algorithm, + digest_type=ds.digest_type, + digest=ds.digest, + ) + + def _find_candidate_keys( keys: Dict[dns.name.Name, Union[dns.rdataset.Rdataset, dns.node.Node]], rrsig: RRSIG ) -> Optional[List[DNSKEY]]: @@ -376,6 +413,15 @@ def _bytes_to_long(b: bytes) -> int: return int.from_bytes(b, "big") +def _get_rrname_rdataset( + rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], +) -> Tuple[dns.name.Name, dns.rdataset.Rdataset]: + if isinstance(rrset, tuple): + return rrset[0], rrset[1] + else: + return rrset.name, rrset + + def _validate_signature(sig: bytes, data: bytes, key: DNSKEY, chosen_hash: Any) -> None: keyptr: bytes if _is_rsa(key.algorithm): @@ -798,12 +844,7 @@ def _make_rrsig_signature_data( # For convenience, allow the rrset to be specified as a (name, # rdataset) tuple as well as a proper rrset - if isinstance(rrset, tuple): - rrname = rrset[0] - rdataset = rrset[1] - else: - rrname = rrset.name - rdataset = rrset + rrname, rdataset = _get_rrname_rdataset(rrset) data = b"" data += rrsig.to_wire(origin=signer)[:18] @@ -927,6 +968,44 @@ def encode_ecdsa_public_key(public_key: "ec.EllipticCurvePublicKey") -> bytes: ) +def _make_cdnskey( + public_key: PublicKey, + algorithm: Union[int, str], + flags: int = Flag.ZONE, + protocol: int = 3, +) -> CDNSKEY: + """Convert a public key to CDNSKEY Rdata + + *public_key*, the public key to convert, a + ``cryptography.hazmat.primitives.asymmetric`` public key class applicable + for DNSSEC. + + *algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm. + + *flags: DNSKEY flags field as an integer. + + *protocol*: DNSKEY protocol field as an integer. + + Raises ``ValueError`` if the specified key algorithm parameters are not + unsupported, ``TypeError`` if the key type is unsupported, + `UnsupportedAlgorithm` if the algorithm is unknown and + `AlgorithmKeyMismatch` if the algorithm does not match the key type. + + Return CDNSKEY ``Rdata``. + """ + + dnskey = _make_dnskey(public_key, algorithm, flags, protocol) + + return CDNSKEY( + rdclass=dnskey.rdclass, + rdtype=dns.rdatatype.CDNSKEY, + flags=dnskey.flags, + protocol=dnskey.protocol, + algorithm=dnskey.algorithm, + key=dnskey.key, + ) + + def nsec3_hash( domain: Union[dns.name.Name, str], salt: Optional[Union[str, bytes]], @@ -988,6 +1067,153 @@ def nsec3_hash( return output +def make_ds_rdataset( + rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], + algorithms: Set[Union[DSDigest, str]], + origin: Optional[dns.name.Name] = None, +) -> dns.rdataset.Rdataset: + """Create a DS record from DNSKEY/CDNSKEY/CDS. + + *rrset*, the RRset to create DS Rdataset for. This can be a + ``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) + tuple. + + *algorithms*, a set of ``str`` or ``int`` specifying the hash algorithms. + The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case + does not matter for these strings. If the RRset is a CDS, only digest + algorithms matching algorithms are accepted. + + *origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name, + then it will be made absolute using the specified origin. + + Raises ``UnsupportedAlgorithm`` if any of the algorithms are unknown and + ``ValueError`` if the given RRset is not usable. + + Returns a ``dns.rdataset.Rdataset`` + """ + + rrname, rdataset = _get_rrname_rdataset(rrset) + + if rdataset.rdtype not in ( + dns.rdatatype.DNSKEY, + dns.rdatatype.CDNSKEY, + dns.rdatatype.CDS, + ): + raise ValueError("rrset not a DNSKEY/CDNSKEY/CDS") + + _algorithms = set() + for algorithm in algorithms: + try: + if isinstance(algorithm, str): + algorithm = DSDigest[algorithm.upper()] + except Exception: + raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) + _algorithms.add(algorithm) + + if rdataset.rdtype == dns.rdatatype.CDS: + res = [] + for rdata in cds_rdataset_to_ds_rdataset(rdataset): + if rdata.digest_type in _algorithms: + res.append(rdata) + if not len(res): + raise ValueError("no acceptable CDS rdata found") + return dns.rdataset.from_rdata_list(rdataset.ttl, res) + + res = [] + for algorithm in _algorithms: + res.extend(dnskey_rdataset_to_cds_rdataset(rrname, rdataset, algorithm, origin)) + return dns.rdataset.from_rdata_list(rdataset.ttl, res) + + +def cds_rdataset_to_ds_rdataset( + rdataset: dns.rdataset.Rdataset, +) -> dns.rdataset.Rdataset: + """Create a CDS record from DS. + + *rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for. + + Raises ``ValueError`` if the rdataset is not CDS. + + Returns a ``dns.rdataset.Rdataset`` + """ + + if rdataset.rdtype != dns.rdatatype.CDS: + raise ValueError("rdataset not a CDS") + res = [] + for rdata in rdataset: + res.append( + CDS( + rdclass=rdata.rdclass, + rdtype=dns.rdatatype.DS, + key_tag=rdata.key_tag, + algorithm=rdata.algorithm, + digest_type=rdata.digest_type, + digest=rdata.digest, + ) + ) + return dns.rdataset.from_rdata_list(rdataset.ttl, res) + + +def dnskey_rdataset_to_cds_rdataset( + name: Union[dns.name.Name, str], + rdataset: dns.rdataset.Rdataset, + algorithm: Union[DSDigest, str], + origin: Optional[dns.name.Name] = None, +) -> dns.rdataset.Rdataset: + """Create a CDS record from DNSKEY/CDNSKEY. + + *name*, a ``dns.name.Name`` or ``str``, the owner name of the CDS record. + + *rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for. + + *algorithm*, a ``str`` or ``int`` specifying the hash algorithm. + The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case + does not matter for these strings. + + *origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name, + then it will be made absolute using the specified origin. + + Raises ``UnsupportedAlgorithm`` if the algorithm is unknown or + ``ValueError`` if the rdataset is not DNSKEY/CDNSKEY. + + Returns a ``dns.rdataset.Rdataset`` + """ + + if rdataset.rdtype not in (dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY): + raise ValueError("rdataset not a DNSKEY/CDNSKEY") + res = [] + for rdata in rdataset: + res.append(make_cds(name, rdata, algorithm, origin)) + return dns.rdataset.from_rdata_list(rdataset.ttl, res) + + +def dnskey_rdataset_to_cdnskey_rdataset( + rdataset: dns.rdataset.Rdataset, +) -> dns.rdataset.Rdataset: + """Create a CDNSKEY record from DNSKEY. + + *rdataset*, a ``dns.rdataset.Rdataset``, to create CDNSKEY Rdataset for. + + Returns a ``dns.rdataset.Rdataset`` + """ + + if rdataset.rdtype != dns.rdatatype.DNSKEY: + raise ValueError("rdataset not a DNSKEY") + res = [] + for rdata in rdataset: + res.append( + CDNSKEY( + rdclass=rdataset.rdclass, + rdtype=rdataset.rdtype, + flags=rdata.flags, + protocol=rdata.protocol, + algorithm=rdata.algorithm, + key=rdata.key, + ) + ) + return dns.rdataset.from_rdata_list(rdataset.ttl, res) + + def _need_pyca(*args, **kwargs): raise ImportError( "DNSSEC validation requires " + "python cryptography" @@ -1010,12 +1236,14 @@ def _need_pyca(*args, **kwargs): validate_rrsig = _need_pyca sign = _need_pyca make_dnskey = _need_pyca + make_cdnskey = _need_pyca _have_pyca = False else: validate = _validate # type: ignore validate_rrsig = _validate_rrsig # type: ignore sign = _sign make_dnskey = _make_dnskey + make_cdnskey = _make_cdnskey _have_pyca = True ### BEGIN generated Algorithm constants diff --git a/tests/test_dnssec.py b/tests/test_dnssec.py index 4a25cd2a0..098af6937 100644 --- a/tests/test_dnssec.py +++ b/tests/test_dnssec.py @@ -25,7 +25,9 @@ import dns.rdata import dns.rdataclass import dns.rdatatype +import dns.rdtypes.ANY.CDNSKEY import dns.rdtypes.ANY.CDS +import dns.rdtypes.ANY.DNSKEY import dns.rdtypes.ANY.DS import dns.rrset @@ -164,6 +166,12 @@ "57349 5 2 53A79A3E7488AB44FFC56B2D1109F0699D1796DD977E72108B841F96 E47D7013", ) +good_cds = dns.rdata.from_text( + dns.rdataclass.IN, + dns.rdatatype.CDS, + "57349 5 2 53A79A3E7488AB44FFC56B2D1109F0699D1796DD977E72108B841F96 E47D7013", +) + when2 = 1290425644 abs_example = dns.name.from_text("example") @@ -937,6 +945,10 @@ def testMakeSHA256DS(self): # type: () -> None ds = dns.dnssec.make_ds(abs_dnspython_org, sep_key, "SHA256") self.assertEqual(ds, good_ds) + def testMakeSHA256CDS(self): # type: () -> None + cds = dns.dnssec.make_cds(abs_dnspython_org, sep_key, "SHA256") + self.assertEqual(cds, good_cds) + def testInvalidAlgorithm(self): # type: () -> None algorithm: Any for algorithm in (10, "shax"): @@ -1006,6 +1018,80 @@ def testInvalidDigestLengthCDS0(self): # type: () -> None dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.CDS, record) self.assertEqual(msg, str(cm.exception)) + def testMakeCDS(self): # type: () -> None + name = dns.name.from_text("example.com") + key = ed448.Ed448PrivateKey.generate() + + for dnskey in [ + dns.dnssec.make_dnskey( + key.public_key(), algorithm=dns.dnssec.Algorithm.ED448 + ), + dns.dnssec.make_cdnskey( + key.public_key(), algorithm=dns.dnssec.Algorithm.ED448 + ), + ]: + dnskey_rdataset = dns.rdataset.from_rdata_list(3600, [dnskey]) + cds_rdataset = dns.dnssec.dnskey_rdataset_to_cds_rdataset( + name, dnskey_rdataset, "SHA256" + ) + self.assertEqual(len(dnskey_rdataset), len(cds_rdataset)) + for d, c in zip(dnskey_rdataset, cds_rdataset): + self.assertTrue( + isinstance( + d, + ( + dns.rdtypes.ANY.DNSKEY.DNSKEY, + dns.rdtypes.ANY.CDNSKEY.CDNSKEY, + ), + ) + ) + self.assertTrue(isinstance(c, dns.rdtypes.ANY.CDS.CDS)) + self.assertEqual(dns.dnssec.key_id(d), c.key_tag) + self.assertEqual(d.algorithm, c.algorithm) + + def testMakeManyDSfromCDS(self): # type: () -> None + name = dns.name.from_text("example.com") + nkeys = 3 + algorithms = ["SHA256", "SHA384"] + keys = [ed448.Ed448PrivateKey.generate() for _ in range(0, nkeys)] + + dnskeys = [ + dns.dnssec.make_dnskey( + key.public_key(), algorithm=dns.dnssec.Algorithm.ED448 + ) + for key in keys + ] + + dnskey_rdataset = dns.rdataset.from_rdata_list(3600, dnskeys) + + cds_rdataset = dns.dnssec.dnskey_rdataset_to_cds_rdataset( + name, dnskey_rdataset, "SHA256" + ) + cds_rrset = dns.rrset.from_rdata_list(name, 3600, cds_rdataset) + + ds_rdataset = dns.dnssec.make_ds_rdataset(cds_rrset, algorithms) + + self.assertEqual(len(cds_rdataset), nkeys) + + def testMakeManyDSfromDNSKEY(self): # type: () -> None + name = dns.name.from_text("example.com") + nkeys = 3 + algorithms = ["SHA256", "SHA384"] + keys = [ed448.Ed448PrivateKey.generate() for _ in range(0, nkeys)] + + dnskeys = [ + dns.dnssec.make_dnskey( + key.public_key(), algorithm=dns.dnssec.Algorithm.ED448 + ) + for key in keys + ] + + dnskey_rrset = dns.rrset.from_rdata_list(name, 3600, dnskeys) + + ds_rdataset = dns.dnssec.make_ds_rdataset(dnskey_rrset, algorithms) + + self.assertEqual(len(ds_rdataset), nkeys * len(algorithms)) + @unittest.skipUnless(dns.dnssec._have_pyca, "Python Cryptography cannot be imported") class DNSSECMakeDNSKEYTestCase(unittest.TestCase): @@ -1035,6 +1121,30 @@ def testInvalidMakeDNSKEY(self): # type: () -> None with self.assertRaises(ValueError): dns.dnssec.make_dnskey(key.public_key(), dns.dnssec.Algorithm.DSA) + def testMakeCDNSKEY(self): # type: () -> None + key = ed448.Ed448PrivateKey.generate() + dnskey = dns.dnssec.make_dnskey( + key.public_key(), algorithm=dns.dnssec.Algorithm.ED448 + ) + cdnskey = dns.dnssec.make_cdnskey( + key.public_key(), algorithm=dns.dnssec.Algorithm.ED448 + ) + + self.assertEqual(dnskey.flags, cdnskey.flags) + self.assertEqual(dnskey.protocol, cdnskey.protocol) + self.assertEqual(dnskey.algorithm, cdnskey.algorithm) + self.assertEqual(dnskey.key, cdnskey.key) + + dnskey_rdataset = dns.rdataset.from_rdata_list(3600, [dnskey]) + cdnskey_rdataset = dns.dnssec.dnskey_rdataset_to_cdnskey_rdataset( + dnskey_rdataset + ) + self.assertEqual(len(dnskey_rdataset), len(cdnskey_rdataset)) + for d, c in zip(dnskey_rdataset, cdnskey_rdataset): + self.assertTrue(isinstance(d, dns.rdtypes.ANY.DNSKEY.DNSKEY)) + self.assertTrue(isinstance(c, dns.rdtypes.ANY.CDNSKEY.CDNSKEY)) + self.assertEqual(d, c) + # XXXRTH This test is fine but is noticably slow, so I have commented it out for # now