Skip to content

Commit

Permalink
Big Black PR (ansible-collections#1784)
Browse files Browse the repository at this point in the history
* Black prep

* Black

* changelog

* Fix pylint unused-import in tests

* Split SSM connection plugin changes

* disable glue tests - bucket's missing

* Disable s3_logging and s3_sync tests

This commit was initially merged in https://github.com/ansible-collections/community.aws
See: ansible-collections/community.aws@2c4575c
  • Loading branch information
tremble authored and abikouo committed Oct 26, 2023
1 parent ed1b06b commit d31a75e
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 113 deletions.
130 changes: 68 additions & 62 deletions plugins/modules/acm_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,10 @@ def ensure_tags(client, module, resource_arn, existing_tags, tags, purge_tags):
botocore.exceptions.ClientError,
botocore.exceptions.BotoCoreError,
) as e:
module.fail_json_aws(
e, "Couldn't add tags to certificate {0}".format(resource_arn)
)
module.fail_json_aws(e, "Couldn't add tags to certificate {0}".format(resource_arn))
if tags_to_remove and not module.check_mode:
# remove_tags_from_certificate wants a list of key, value pairs, not a list of keys.
tags_list = [{'Key': key, 'Value': existing_tags.get(key)} for key in tags_to_remove]
tags_list = [{"Key": key, "Value": existing_tags.get(key)} for key in tags_to_remove]
try:
client.remove_tags_from_certificate(
CertificateArn=resource_arn,
Expand All @@ -291,9 +289,7 @@ def ensure_tags(client, module, resource_arn, existing_tags, tags, purge_tags):
botocore.exceptions.ClientError,
botocore.exceptions.BotoCoreError,
) as e:
module.fail_json_aws(
e, "Couldn't remove tags from certificate {0}".format(resource_arn)
)
module.fail_json_aws(e, "Couldn't remove tags from certificate {0}".format(resource_arn))
new_tags = deepcopy(existing_tags)
for key, value in tags_to_add.items():
new_tags[key] = value
Expand All @@ -308,15 +304,14 @@ def ensure_tags(client, module, resource_arn, existing_tags, tags, purge_tags):
# May include some lines between each chain in the cert, e.g. "Subject: ..."
# Returns True iff the chains/certs are functionally identical (including chain order)
def chain_compare(module, a, b):

chain_a_pem = pem_chain_split(module, a)
chain_b_pem = pem_chain_split(module, b)

if len(chain_a_pem) != len(chain_b_pem):
return False

# Chain length is the same
for (ca, cb) in zip(chain_a_pem, chain_b_pem):
for ca, cb in zip(chain_a_pem, chain_b_pem):
der_a = PEM_body_to_DER(module, ca)
der_b = PEM_body_to_DER(module, cb)
if der_a != der_b:
Expand All @@ -336,15 +331,16 @@ def PEM_body_to_DER(module, pem):


# Store this globally to avoid repeated recompilation
pem_chain_split_regex = re.compile(r"------?BEGIN [A-Z0-9. ]*CERTIFICATE------?([a-zA-Z0-9\+\/=\s]+)------?END [A-Z0-9. ]*CERTIFICATE------?")
pem_chain_split_regex = re.compile(
r"------?BEGIN [A-Z0-9. ]*CERTIFICATE------?([a-zA-Z0-9\+\/=\s]+)------?END [A-Z0-9. ]*CERTIFICATE------?"
)


# Use regex to split up a chain or single cert into an array of base64 encoded data
# Using "-----BEGIN CERTIFICATE-----" and "----END CERTIFICATE----"
# Noting that some chains have non-pem data in between each cert
# This function returns only what's between the headers, excluding the headers
def pem_chain_split(module, pem):

pem_arr = re.findall(pem_chain_split_regex, to_text(pem))

if len(pem_arr) == 0:
Expand All @@ -359,53 +355,55 @@ def update_imported_certificate(client, module, acm, old_cert, desired_tags):
Update the existing certificate that was previously imported in ACM.
"""
module.debug("Existing certificate found in ACM")
if ('tags' not in old_cert) or ('Name' not in old_cert['tags']):
if ("tags" not in old_cert) or ("Name" not in old_cert["tags"]):
# shouldn't happen
module.fail_json(msg="Internal error, unsure which certificate to update", certificate=old_cert)
if module.params.get('name_tag') is not None and (old_cert['tags']['Name'] != module.params.get('name_tag')):
if module.params.get("name_tag") is not None and (old_cert["tags"]["Name"] != module.params.get("name_tag")):
# This could happen if the user identified the certificate using 'certificate_arn' or 'domain_name',
# and the 'Name' tag in the AWS API does not match the ansible 'name_tag'.
module.fail_json(msg="Internal error, Name tag does not match", certificate=old_cert)
if 'certificate' not in old_cert:
if "certificate" not in old_cert:
# shouldn't happen
module.fail_json(msg="Internal error, unsure what the existing cert in ACM is", certificate=old_cert)

cert_arn = None
# Are the existing certificate in ACM and the local certificate the same?
same = True
if module.params.get('certificate') is not None:
same &= chain_compare(module, old_cert['certificate'], module.params['certificate'])
if module.params['certificate_chain']:
if module.params.get("certificate") is not None:
same &= chain_compare(module, old_cert["certificate"], module.params["certificate"])
if module.params["certificate_chain"]:
# Need to test this
# not sure if Amazon appends the cert itself to the chain when self-signed
same &= chain_compare(module, old_cert['certificate_chain'], module.params['certificate_chain'])
same &= chain_compare(module, old_cert["certificate_chain"], module.params["certificate_chain"])
else:
# When there is no chain with a cert
# it seems Amazon returns the cert itself as the chain
same &= chain_compare(module, old_cert['certificate_chain'], module.params['certificate'])
same &= chain_compare(module, old_cert["certificate_chain"], module.params["certificate"])

if same:
module.debug("Existing certificate in ACM is the same")
cert_arn = old_cert['certificate_arn']
cert_arn = old_cert["certificate_arn"]
changed = False
else:
absent_args = ['certificate', 'name_tag', 'private_key']
absent_args = ["certificate", "name_tag", "private_key"]
if sum([(module.params[a] is not None) for a in absent_args]) < 3:
module.fail_json(msg="When importing a certificate, all of 'name_tag', 'certificate' and 'private_key' must be specified")
module.fail_json(
msg="When importing a certificate, all of 'name_tag', 'certificate' and 'private_key' must be specified"
)
module.debug("Existing certificate in ACM is different, overwriting")
changed = True
if module.check_mode:
cert_arn = old_cert['certificate_arn']
cert_arn = old_cert["certificate_arn"]
# note: returned domain will be the domain of the previous cert
else:
# update cert in ACM
cert_arn = acm.import_certificate(
client,
module,
certificate=module.params['certificate'],
private_key=module.params['private_key'],
certificate_chain=module.params['certificate_chain'],
arn=old_cert['certificate_arn'],
certificate=module.params["certificate"],
private_key=module.params["private_key"],
certificate_chain=module.params["certificate_chain"],
arn=old_cert["certificate_arn"],
tags=desired_tags,
)
return (changed, cert_arn)
Expand All @@ -416,22 +414,24 @@ def import_certificate(client, module, acm, desired_tags):
Import a certificate to ACM.
"""
# Validate argument requirements
absent_args = ['certificate', 'name_tag', 'private_key']
absent_args = ["certificate", "name_tag", "private_key"]
cert_arn = None
if sum([(module.params[a] is not None) for a in absent_args]) < 3:
module.fail_json(msg="When importing a new certificate, all of 'name_tag', 'certificate' and 'private_key' must be specified")
module.fail_json(
msg="When importing a new certificate, all of 'name_tag', 'certificate' and 'private_key' must be specified"
)
module.debug("No certificate in ACM. Creating new one.")
changed = True
if module.check_mode:
domain = 'example.com'
domain = "example.com"
module.exit_json(certificate=dict(domain_name=domain), changed=True)
else:
cert_arn = acm.import_certificate(
client,
module,
certificate=module.params['certificate'],
private_key=module.params['private_key'],
certificate_chain=module.params['certificate_chain'],
certificate=module.params["certificate"],
private_key=module.params["private_key"],
certificate_chain=module.params["certificate_chain"],
tags=desired_tags,
)
return (changed, cert_arn)
Expand All @@ -441,7 +441,7 @@ def ensure_certificates_present(client, module, acm, certificates, desired_tags,
cert_arn = None
changed = False
if len(certificates) > 1:
msg = "More than one certificate with Name=%s exists in ACM in this region" % module.params['name_tag']
msg = "More than one certificate with Name=%s exists in ACM in this region" % module.params["name_tag"]
module.fail_json(msg=msg, certificates=certificates)
elif len(certificates) == 1:
# Update existing certificate that was previously imported to ACM.
Expand All @@ -452,11 +452,13 @@ def ensure_certificates_present(client, module, acm, certificates, desired_tags,

# Add/remove tags to/from certificate
try:
existing_tags = boto3_tag_list_to_ansible_dict(client.list_tags_for_certificate(CertificateArn=cert_arn)['Tags'])
existing_tags = boto3_tag_list_to_ansible_dict(
client.list_tags_for_certificate(CertificateArn=cert_arn)["Tags"]
)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
module.fail_json_aws(e, "Couldn't get tags for certificate")

purge_tags = module.params.get('purge_tags')
purge_tags = module.params.get("purge_tags")
(c, new_tags) = ensure_tags(client, module, cert_arn, existing_tags, desired_tags, purge_tags)
changed |= c
domain = acm.get_domain_of_cert(client=client, module=module, arn=cert_arn)
Expand All @@ -466,21 +468,21 @@ def ensure_certificates_present(client, module, acm, certificates, desired_tags,
def ensure_certificates_absent(client, module, acm, certificates):
for cert in certificates:
if not module.check_mode:
acm.delete_certificate(client, module, cert['certificate_arn'])
module.exit_json(arns=[cert['certificate_arn'] for cert in certificates], changed=(len(certificates) > 0))
acm.delete_certificate(client, module, cert["certificate_arn"])
module.exit_json(arns=[cert["certificate_arn"] for cert in certificates], changed=(len(certificates) > 0))


def main():
argument_spec = dict(
certificate=dict(),
certificate_arn=dict(aliases=['arn']),
certificate_arn=dict(aliases=["arn"]),
certificate_chain=dict(),
domain_name=dict(aliases=['domain']),
name_tag=dict(aliases=['name']),
domain_name=dict(aliases=["domain"]),
name_tag=dict(aliases=["name"]),
private_key=dict(no_log=True),
tags=dict(type='dict', aliases=['resource_tags']),
purge_tags=dict(type='bool', default=True),
state=dict(default='present', choices=['present', 'absent']),
tags=dict(type="dict", aliases=["resource_tags"]),
purge_tags=dict(type="bool", default=True),
state=dict(default="present", choices=["present", "absent"]),
)
module = AnsibleAWSModule(
argument_spec=argument_spec,
Expand All @@ -489,62 +491,66 @@ def main():
acm = ACMServiceManager(module)

# Check argument requirements
if module.params['state'] == 'present':
if module.params["state"] == "present":
# at least one of these should be specified.
absent_args = ['certificate_arn', 'domain_name', 'name_tag']
absent_args = ["certificate_arn", "domain_name", "name_tag"]
if sum([(module.params[a] is not None) for a in absent_args]) < 1:
for a in absent_args:
module.debug("%s is %s" % (a, module.params[a]))
module.fail_json(msg="If 'state' is specified as 'present' then at least one of 'name_tag', 'certificate_arn' or 'domain_name' must be specified")
module.fail_json(
msg="If 'state' is specified as 'present' then at least one of 'name_tag', 'certificate_arn' or 'domain_name' must be specified"
)
else: # absent
# exactly one of these should be specified
absent_args = ['certificate_arn', 'domain_name', 'name_tag']
absent_args = ["certificate_arn", "domain_name", "name_tag"]
if sum([(module.params[a] is not None) for a in absent_args]) != 1:
for a in absent_args:
module.debug("%s is %s" % (a, module.params[a]))
module.fail_json(msg="If 'state' is specified as 'absent' then exactly one of 'name_tag', 'certificate_arn' or 'domain_name' must be specified")
module.fail_json(
msg="If 'state' is specified as 'absent' then exactly one of 'name_tag', 'certificate_arn' or 'domain_name' must be specified"
)

filter_tags = None
desired_tags = None
if module.params.get('tags') is not None:
desired_tags = module.params['tags']
if module.params.get("tags") is not None:
desired_tags = module.params["tags"]
else:
# Because we're setting the Name tag, we need to explicitly not purge when tags isn't passed
module.params['purge_tags'] = False
if module.params.get('name_tag') is not None:
module.params["purge_tags"] = False
if module.params.get("name_tag") is not None:
# The module was originally implemented to filter certificates based on the 'Name' tag.
# Other tags are not used to filter certificates.
# It would make sense to replace the existing name_tag, domain, certificate_arn attributes
# with a 'filter' attribute, but that would break backwards-compatibility.
filter_tags = dict(Name=module.params['name_tag'])
filter_tags = dict(Name=module.params["name_tag"])
if desired_tags is not None:
if 'Name' in desired_tags:
if desired_tags['Name'] != module.params['name_tag']:
if "Name" in desired_tags:
if desired_tags["Name"] != module.params["name_tag"]:
module.fail_json(msg="Value of 'name_tag' conflicts with value of 'tags.Name'")
else:
desired_tags['Name'] = module.params['name_tag']
desired_tags["Name"] = module.params["name_tag"]
else:
desired_tags = deepcopy(filter_tags)

client = module.client('acm')
client = module.client("acm")

# fetch the list of certificates currently in ACM
certificates = acm.get_certificates(
client=client,
module=module,
domain_name=module.params['domain_name'],
arn=module.params['certificate_arn'],
domain_name=module.params["domain_name"],
arn=module.params["certificate_arn"],
only_tags=filter_tags,
)

module.debug("Found %d corresponding certificates in ACM" % len(certificates))
if module.params['state'] == 'present':
if module.params["state"] == "present":
ensure_certificates_present(client, module, acm, certificates, desired_tags, filter_tags)

else: # state == absent
ensure_certificates_absent(client, module, acm, certificates)


if __name__ == '__main__':
if __name__ == "__main__":
# tests()
main()
41 changes: 26 additions & 15 deletions plugins/modules/acm_certificate_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,31 +264,42 @@

def main():
argument_spec = dict(
certificate_arn=dict(aliases=['arn']),
domain_name=dict(aliases=['name']),
certificate_arn=dict(aliases=["arn"]),
domain_name=dict(aliases=["name"]),
statuses=dict(
type='list',
elements='str',
choices=['PENDING_VALIDATION', 'ISSUED', 'INACTIVE', 'EXPIRED', 'VALIDATION_TIMED_OUT', 'REVOKED', 'FAILED']
type="list",
elements="str",
choices=[
"PENDING_VALIDATION",
"ISSUED",
"INACTIVE",
"EXPIRED",
"VALIDATION_TIMED_OUT",
"REVOKED",
"FAILED",
],
),
tags=dict(type='dict'),
tags=dict(type="dict"),
)
module = AnsibleAWSModule(argument_spec=argument_spec, supports_check_mode=True)
acm_info = ACMServiceManager(module)

client = module.client('acm')
client = module.client("acm")

certificates = acm_info.get_certificates(client, module,
domain_name=module.params['domain_name'],
statuses=module.params['statuses'],
arn=module.params['certificate_arn'],
only_tags=module.params['tags'])
certificates = acm_info.get_certificates(
client,
module,
domain_name=module.params["domain_name"],
statuses=module.params["statuses"],
arn=module.params["certificate_arn"],
only_tags=module.params["tags"],
)

if module.params['certificate_arn'] and len(certificates) != 1:
module.fail_json(msg="No certificate exists in this region with ARN %s" % module.params['certificate_arn'])
if module.params["certificate_arn"] and len(certificates) != 1:
module.fail_json(msg="No certificate exists in this region with ARN %s" % module.params["certificate_arn"])

module.exit_json(certificates=certificates)


if __name__ == '__main__':
if __name__ == "__main__":
main()
Loading

0 comments on commit d31a75e

Please sign in to comment.