Skip to content

Commit

Permalink
CDS/CDNSKEY utilities (#872)
Browse files Browse the repository at this point in the history
Add CDS and CDNSKEY utilities:

make_cdnskey()
make_cds()
make_ds_rdataset()
cds_rdataset_to_ds_rdataset()
dnskey_rdataset_to_cds_rdataset()
dnskey_rdataset_to_cdnskey_rdataset()
  • Loading branch information
jschlyter authored Dec 23, 2022
1 parent 8e200f9 commit 620e9de
Show file tree
Hide file tree
Showing 2 changed files with 349 additions and 11 deletions.
250 changes: 239 additions & 11 deletions dns/dnssec.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

"""Common DNSSEC-related functions and constants."""

from typing import Any, cast, Dict, List, Optional, Tuple, Union
from typing import Any, cast, Dict, List, Optional, Set, Tuple, Union

import hashlib
import math
Expand All @@ -36,6 +36,8 @@
import dns.rdatatype
import dns.rdataclass
import dns.rrset
from dns.rdtypes.ANY.CDNSKEY import CDNSKEY
from dns.rdtypes.ANY.CDS import CDS
from dns.rdtypes.ANY.DNSKEY import DNSKEY
from dns.rdtypes.ANY.DS import DS
from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime
Expand Down Expand Up @@ -109,7 +111,7 @@ def to_timestamp(value: Union[datetime, str, float, int]) -> int:
raise TypeError("Unsupported timestamp type")


def key_id(key: DNSKEY) -> int:
def key_id(key: Union[DNSKEY,CDNSKEY]) -> int:
"""Return the key id (a 16-bit number) for the specified key.
*key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY``
Expand Down Expand Up @@ -193,7 +195,7 @@ def make_ds(
*name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record.
*key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY``, the key the DS is about.
*key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``, the key the DS is about.
*algorithm*, a ``str`` or ``int`` specifying the hash algorithm.
The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
Expand Down Expand Up @@ -230,8 +232,8 @@ def make_ds(
check = policy.ok_to_create_ds
if not check(algorithm):
raise DeniedByPolicy
if not isinstance(key, DNSKEY):
raise ValueError("key is not a DNSKEY")
if not isinstance(key, (DNSKEY, CDNSKEY)):
raise ValueError("key is not a DNSKEY/CDNSKEY")
if algorithm == DSDigest.SHA1:
dshash = hashlib.sha1()
elif algorithm == DSDigest.SHA256:
Expand All @@ -256,6 +258,41 @@ def make_ds(
return cast(DS, ds)


def make_cds(
name: Union[dns.name.Name, str],
key: dns.rdata.Rdata,
algorithm: Union[DSDigest, str],
origin: Optional[dns.name.Name] = None,
) -> CDS:
"""Create a CDS record for a DNSSEC key.
*name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record.
*key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``, key the DS is about.
*algorithm*, a ``str`` or ``int`` specifying the hash algorithm.
The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
does not matter for these strings.
*origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name,
then it will be made absolute using the specified origin.
Raises ``UnsupportedAlgorithm`` if the algorithm is unknown.
Returns a ``dns.rdtypes.ANY.DS.CDS``
"""

ds = make_ds(name, key, algorithm, origin)
return CDS(
rdclass=ds.rdclass,
rdtype=dns.rdatatype.CDS,
key_tag=ds.key_tag,
algorithm=ds.algorithm,
digest_type=ds.digest_type,
digest=ds.digest,
)


def _find_candidate_keys(
keys: Dict[dns.name.Name, Union[dns.rdataset.Rdataset, dns.node.Node]], rrsig: RRSIG
) -> Optional[List[DNSKEY]]:
Expand Down Expand Up @@ -376,6 +413,15 @@ def _bytes_to_long(b: bytes) -> int:
return int.from_bytes(b, "big")


def _get_rrname_rdataset(
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
) -> Tuple[dns.name.Name, dns.rdataset.Rdataset]:
if isinstance(rrset, tuple):
return rrset[0], rrset[1]
else:
return rrset.name, rrset


def _validate_signature(sig: bytes, data: bytes, key: DNSKEY, chosen_hash: Any) -> None:
keyptr: bytes
if _is_rsa(key.algorithm):
Expand Down Expand Up @@ -798,12 +844,7 @@ def _make_rrsig_signature_data(

# For convenience, allow the rrset to be specified as a (name,
# rdataset) tuple as well as a proper rrset
if isinstance(rrset, tuple):
rrname = rrset[0]
rdataset = rrset[1]
else:
rrname = rrset.name
rdataset = rrset
rrname, rdataset = _get_rrname_rdataset(rrset)

data = b""
data += rrsig.to_wire(origin=signer)[:18]
Expand Down Expand Up @@ -927,6 +968,44 @@ def encode_ecdsa_public_key(public_key: "ec.EllipticCurvePublicKey") -> bytes:
)


def _make_cdnskey(
public_key: PublicKey,
algorithm: Union[int, str],
flags: int = Flag.ZONE,
protocol: int = 3,
) -> CDNSKEY:
"""Convert a public key to CDNSKEY Rdata
*public_key*, the public key to convert, a
``cryptography.hazmat.primitives.asymmetric`` public key class applicable
for DNSSEC.
*algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm.
*flags: DNSKEY flags field as an integer.
*protocol*: DNSKEY protocol field as an integer.
Raises ``ValueError`` if the specified key algorithm parameters are not
unsupported, ``TypeError`` if the key type is unsupported,
`UnsupportedAlgorithm` if the algorithm is unknown and
`AlgorithmKeyMismatch` if the algorithm does not match the key type.
Return CDNSKEY ``Rdata``.
"""

dnskey = _make_dnskey(public_key, algorithm, flags, protocol)

return CDNSKEY(
rdclass=dnskey.rdclass,
rdtype=dns.rdatatype.CDNSKEY,
flags=dnskey.flags,
protocol=dnskey.protocol,
algorithm=dnskey.algorithm,
key=dnskey.key,
)


def nsec3_hash(
domain: Union[dns.name.Name, str],
salt: Optional[Union[str, bytes]],
Expand Down Expand Up @@ -988,6 +1067,153 @@ def nsec3_hash(
return output


def make_ds_rdataset(
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
algorithms: Set[Union[DSDigest, str]],
origin: Optional[dns.name.Name] = None,
) -> dns.rdataset.Rdataset:
"""Create a DS record from DNSKEY/CDNSKEY/CDS.
*rrset*, the RRset to create DS Rdataset for. This can be a
``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``)
tuple.
*algorithms*, a set of ``str`` or ``int`` specifying the hash algorithms.
The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
does not matter for these strings. If the RRset is a CDS, only digest
algorithms matching algorithms are accepted.
*origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name,
then it will be made absolute using the specified origin.
Raises ``UnsupportedAlgorithm`` if any of the algorithms are unknown and
``ValueError`` if the given RRset is not usable.
Returns a ``dns.rdataset.Rdataset``
"""

rrname, rdataset = _get_rrname_rdataset(rrset)

if rdataset.rdtype not in (
dns.rdatatype.DNSKEY,
dns.rdatatype.CDNSKEY,
dns.rdatatype.CDS,
):
raise ValueError("rrset not a DNSKEY/CDNSKEY/CDS")

_algorithms = set()
for algorithm in algorithms:
try:
if isinstance(algorithm, str):
algorithm = DSDigest[algorithm.upper()]
except Exception:
raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm)
_algorithms.add(algorithm)

if rdataset.rdtype == dns.rdatatype.CDS:
res = []
for rdata in cds_rdataset_to_ds_rdataset(rdataset):
if rdata.digest_type in _algorithms:
res.append(rdata)
if not len(res):
raise ValueError("no acceptable CDS rdata found")
return dns.rdataset.from_rdata_list(rdataset.ttl, res)

res = []
for algorithm in _algorithms:
res.extend(dnskey_rdataset_to_cds_rdataset(rrname, rdataset, algorithm, origin))
return dns.rdataset.from_rdata_list(rdataset.ttl, res)


def cds_rdataset_to_ds_rdataset(
rdataset: dns.rdataset.Rdataset,
) -> dns.rdataset.Rdataset:
"""Create a CDS record from DS.
*rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for.
Raises ``ValueError`` if the rdataset is not CDS.
Returns a ``dns.rdataset.Rdataset``
"""

if rdataset.rdtype != dns.rdatatype.CDS:
raise ValueError("rdataset not a CDS")
res = []
for rdata in rdataset:
res.append(
CDS(
rdclass=rdata.rdclass,
rdtype=dns.rdatatype.DS,
key_tag=rdata.key_tag,
algorithm=rdata.algorithm,
digest_type=rdata.digest_type,
digest=rdata.digest,
)
)
return dns.rdataset.from_rdata_list(rdataset.ttl, res)


def dnskey_rdataset_to_cds_rdataset(
name: Union[dns.name.Name, str],
rdataset: dns.rdataset.Rdataset,
algorithm: Union[DSDigest, str],
origin: Optional[dns.name.Name] = None,
) -> dns.rdataset.Rdataset:
"""Create a CDS record from DNSKEY/CDNSKEY.
*name*, a ``dns.name.Name`` or ``str``, the owner name of the CDS record.
*rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for.
*algorithm*, a ``str`` or ``int`` specifying the hash algorithm.
The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
does not matter for these strings.
*origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name,
then it will be made absolute using the specified origin.
Raises ``UnsupportedAlgorithm`` if the algorithm is unknown or
``ValueError`` if the rdataset is not DNSKEY/CDNSKEY.
Returns a ``dns.rdataset.Rdataset``
"""

if rdataset.rdtype not in (dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY):
raise ValueError("rdataset not a DNSKEY/CDNSKEY")
res = []
for rdata in rdataset:
res.append(make_cds(name, rdata, algorithm, origin))
return dns.rdataset.from_rdata_list(rdataset.ttl, res)


def dnskey_rdataset_to_cdnskey_rdataset(
rdataset: dns.rdataset.Rdataset,
) -> dns.rdataset.Rdataset:
"""Create a CDNSKEY record from DNSKEY.
*rdataset*, a ``dns.rdataset.Rdataset``, to create CDNSKEY Rdataset for.
Returns a ``dns.rdataset.Rdataset``
"""

if rdataset.rdtype != dns.rdatatype.DNSKEY:
raise ValueError("rdataset not a DNSKEY")
res = []
for rdata in rdataset:
res.append(
CDNSKEY(
rdclass=rdataset.rdclass,
rdtype=rdataset.rdtype,
flags=rdata.flags,
protocol=rdata.protocol,
algorithm=rdata.algorithm,
key=rdata.key,
)
)
return dns.rdataset.from_rdata_list(rdataset.ttl, res)


def _need_pyca(*args, **kwargs):
raise ImportError(
"DNSSEC validation requires " + "python cryptography"
Expand All @@ -1010,12 +1236,14 @@ def _need_pyca(*args, **kwargs):
validate_rrsig = _need_pyca
sign = _need_pyca
make_dnskey = _need_pyca
make_cdnskey = _need_pyca
_have_pyca = False
else:
validate = _validate # type: ignore
validate_rrsig = _validate_rrsig # type: ignore
sign = _sign
make_dnskey = _make_dnskey
make_cdnskey = _make_cdnskey
_have_pyca = True

### BEGIN generated Algorithm constants
Expand Down
Loading

0 comments on commit 620e9de

Please sign in to comment.