Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PR #1288/7e14dcf0 backport][stable-5] ec2_key/tests: add unit-tests #1380

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelogs/fragments/unit-tests_test_ec2_key_only.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
minor_changes:
- "ec2_key - Add unit-tests coverage (https://github.com/ansible-collections/amazon.aws/pull/1288)."
238 changes: 146 additions & 92 deletions plugins/modules/ec2_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,140 +162,178 @@

from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule
from ansible_collections.amazon.aws.plugins.module_utils.core import is_boto3_error_code
from ansible_collections.amazon.aws.plugins.module_utils.core import scrub_none_parameters
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import AWSRetry
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ensure_ec2_tags
from ansible_collections.amazon.aws.plugins.module_utils.tagging import boto3_tag_specifications
from ansible_collections.amazon.aws.plugins.module_utils.tagging import boto3_tag_list_to_ansible_dict


def extract_key_data(key, key_type=None):
class Ec2KeyFailure(Exception):
def __init__(self, message=None, original_e=None):
super().__init__(message)
self.original_e = original_e
self.message = message


def _import_key_pair(ec2_client, name, key_material, tag_spec=None):
params = {
'KeyName': name,
'PublicKeyMaterial': to_bytes(key_material),
'TagSpecifications': tag_spec
}

params = scrub_none_parameters(params)

try:
key = ec2_client.import_key_pair(aws_retry=True, **params)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as err:
raise Ec2KeyFailure(err, "error importing key")
return key


def extract_key_data(key, key_type=None):
data = {
'name': key['KeyName'],
'fingerprint': key['KeyFingerprint'],
'id': key['KeyPairId'],
'tags': {},
'tags': boto3_tag_list_to_ansible_dict(key.get('Tags') or []),
# KeyMaterial is returned by create_key_pair, but not by describe_key_pairs
'private_key': key.get('KeyMaterial'),
# KeyType is only set by describe_key_pairs
'type': key.get('KeyType') or key_type
}
if 'Tags' in key:
data['tags'] = boto3_tag_list_to_ansible_dict(key['Tags'])
if 'KeyMaterial' in key:
data['private_key'] = key['KeyMaterial']
if 'KeyType' in key:
data['type'] = key['KeyType']
elif key_type:
data['type'] = key_type
return data


def get_key_fingerprint(module, ec2_client, key_material):

return scrub_none_parameters(data)


def get_key_fingerprint(check_mode, ec2_client, key_material):
'''
EC2's fingerprints are non-trivial to generate, so push this key
to a temporary name and make ec2 calculate the fingerprint for us.
http://blog.jbrowne.com/?p=23
https://forums.aws.amazon.com/thread.jspa?messageID=352828
'''

# find an unused name
name_in_use = True
while name_in_use:
random_name = "ansible-" + str(uuid.uuid4())
name_in_use = find_key_pair(module, ec2_client, random_name)

temp_key = _import_key_pair(module, ec2_client, random_name, key_material)
delete_key_pair(module, ec2_client, random_name, finish_task=False)
name_in_use = find_key_pair(ec2_client, random_name)
temp_key = _import_key_pair(ec2_client, random_name, key_material)
delete_key_pair(check_mode, ec2_client, random_name, finish_task=False)
return temp_key['KeyFingerprint']


def find_key_pair(module, ec2_client, name):

def find_key_pair(ec2_client, name):
try:
key = ec2_client.describe_key_pairs(aws_retry=True, KeyNames=[name])['KeyPairs'][0]
key = ec2_client.describe_key_pairs(aws_retry=True, KeyNames=[name])
except is_boto3_error_code('InvalidKeyPair.NotFound'):
return None
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as err: # pylint: disable=duplicate-except
module.fail_json_aws(err, msg="error finding keypair")
raise Ec2KeyFailure(err, "error finding keypair")
except IndexError:
key = None
return key

return key['KeyPairs'][0]

def create_key_pair(module, ec2_client, name, key_material, force, key_type):

tags = module.params.get('tags')
purge_tags = module.params.get('purge_tags')
key = find_key_pair(module, ec2_client, name)
tag_spec = boto3_tag_specifications(tags, ['key-pair'])
changed = False
if key:
if key_material and force:
new_fingerprint = get_key_fingerprint(module, ec2_client, key_material)
if key['KeyFingerprint'] != new_fingerprint:
changed = True
if not module.check_mode:
delete_key_pair(module, ec2_client, name, finish_task=False)
key = _import_key_pair(module, ec2_client, name, key_material, tag_spec)
key_data = extract_key_data(key)
module.exit_json(changed=True, key=key_data, msg="key pair updated")
if key_type and key_type != key['KeyType']:
changed = True
if not module.check_mode:
delete_key_pair(module, ec2_client, name, finish_task=False)
key = _create_key_pair(module, ec2_client, name, tag_spec, key_type)
key_data = extract_key_data(key, key_type)
module.exit_json(changed=True, key=key_data, msg="key pair updated")
changed |= ensure_ec2_tags(ec2_client, module, key['KeyPairId'], tags=tags, purge_tags=purge_tags)
key = find_key_pair(module, ec2_client, name)
key_data = extract_key_data(key)
module.exit_json(changed=changed, key=key_data, msg="key pair already exists")
else:
# key doesn't exist, create it now
key_data = None
if not module.check_mode:
if key_material:
key = _import_key_pair(module, ec2_client, name, key_material, tag_spec)
else:
key = _create_key_pair(module, ec2_client, name, tag_spec, key_type)
key_data = extract_key_data(key, key_type)
module.exit_json(changed=True, key=key_data, msg="key pair created")
def _create_key_pair(ec2_client, name, tag_spec, key_type):
params = {
'KeyName': name,
'TagSpecifications': tag_spec,
'KeyType': key_type,
}

params = scrub_none_parameters(params)

def _create_key_pair(module, ec2_client, name, tag_spec, key_type):
params = dict(KeyName=name)
if tag_spec:
params['TagSpecifications'] = tag_spec
if key_type:
params['KeyType'] = key_type
try:
key = ec2_client.create_key_pair(aws_retry=True, **params)
except botocore.exceptions.ClientError as err:
module.fail_json_aws(err, msg="error creating key")
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as err:
raise Ec2KeyFailure(err, "error creating key")
return key


def _import_key_pair(module, ec2_client, name, key_material, tag_spec=None):
params = dict(KeyName=name, PublicKeyMaterial=to_bytes(key_material))
if tag_spec:
params['TagSpecifications'] = tag_spec
def create_new_key_pair(ec2_client, name, key_material, key_type, tags, check_mode):
'''
key does not exist, we create new key
'''
if check_mode:
return {'changed': True, 'key': None, 'msg': 'key pair created'}

tag_spec = boto3_tag_specifications(tags, ['key-pair'])
if key_material:
key = _import_key_pair(ec2_client, name, key_material, tag_spec)
else:
key = _create_key_pair(ec2_client, name, tag_spec, key_type)
key_data = extract_key_data(key, key_type)

result = {'changed': True, 'key': key_data, 'msg': 'key pair created'}
return result


def update_key_pair_by_key_material(check_mode, ec2_client, name, key, key_material, tag_spec):
if check_mode:
return {'changed': True, 'key': None, 'msg': 'key pair updated'}
new_fingerprint = get_key_fingerprint(check_mode, ec2_client, key_material)
if key['KeyFingerprint'] != new_fingerprint:
delete_key_pair(check_mode, ec2_client, name, finish_task=False)
key = _import_key_pair(ec2_client, name, key_material, tag_spec)
key_data = extract_key_data(key)
return {'changed': True, 'key': key_data, 'msg': "key pair updated"}


def update_key_pair_by_key_type(check_mode, ec2_client, name, key_type, tag_spec):
if check_mode:
return {'changed': True, 'key': None, 'msg': 'key pair updated'}
else:
delete_key_pair(check_mode, ec2_client, name, finish_task=False)
key = _create_key_pair(ec2_client, name, tag_spec, key_type)
key_data = extract_key_data(key, key_type)
return {'changed': True, 'key': key_data, 'msg': "key pair updated"}


def _delete_key_pair(ec2_client, key_name):
try:
key = ec2_client.import_key_pair(aws_retry=True, **params)
except botocore.exceptions.ClientError as err:
module.fail_json_aws(err, msg="error importing key")
return key
ec2_client.delete_key_pair(aws_retry=True, KeyName=key_name)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as err:
raise Ec2KeyFailure(err, "error deleting key")


def delete_key_pair(module, ec2_client, name, finish_task=True):
def delete_key_pair(check_mode, ec2_client, name, finish_task=True):
key = find_key_pair(ec2_client, name)

key = find_key_pair(module, ec2_client, name)
if key:
if not module.check_mode:
try:
ec2_client.delete_key_pair(aws_retry=True, KeyName=name)
except botocore.exceptions.ClientError as err:
module.fail_json_aws(err, msg="error deleting key")
if key and check_mode:
result = {'changed': True, 'key': None, 'msg': 'key deleted'}
elif not key:
result = {'key': None, 'msg': 'key did not exist'}
else:
_delete_key_pair(ec2_client, name)
if not finish_task:
return
module.exit_json(changed=True, key=None, msg="key deleted")
module.exit_json(key=None, msg="key did not exist")
result = {'changed': True, 'key': None, 'msg': 'key deleted'}

return result


def handle_existing_key_pair_update(module, ec2_client, name, key):
key_material = module.params.get('key_material')
force = module.params.get('force')
key_type = module.params.get('key_type')
tags = module.params.get('tags')
purge_tags = module.params.get('purge_tags')
tag_spec = boto3_tag_specifications(tags, ['key-pair'])
check_mode = module.check_mode
if key_material and force:
result = update_key_pair_by_key_material(check_mode, ec2_client, name, key, key_material, tag_spec)
elif key_type and key_type != key['KeyType']:
result = update_key_pair_by_key_type(check_mode, ec2_client, name, key_type, tag_spec)
else:
changed = False
changed |= ensure_ec2_tags(ec2_client, module, key['KeyPairId'], tags=tags, purge_tags=purge_tags)
key = find_key_pair(ec2_client, name)
key_data = extract_key_data(key)
result = {'changed': changed, 'key': key_data, 'msg': 'key pair alreday exists'}
return result


def main():
Expand Down Expand Up @@ -323,16 +361,32 @@ def main():
name = module.params['name']
state = module.params.get('state')
key_material = module.params.get('key_material')
force = module.params.get('force')
key_type = module.params.get('key_type')
tags = module.params.get('tags')

result = {}

if key_type:
module.require_botocore_at_least('1.21.23', reason='to set the key_type for a keypair')
try:
if state == 'absent':
result = delete_key_pair(module.check_mode, ec2_client, name)

elif state == 'present':
# check if key already exists
key = find_key_pair(ec2_client, name)
if key:
result = handle_existing_key_pair_update(module, ec2_client, name, key)
else:
result = create_new_key_pair(ec2_client, name, key_material, key_type, tags, module.check_mode)

except Ec2KeyFailure as e:
if e.original_e:
module.fail_json_aws(e.original_e, e.message)
else:
module.fail_json(e.message)

if state == 'absent':
delete_key_pair(module, ec2_client, name)
elif state == 'present':
create_key_pair(module, ec2_client, name, key_material, force, key_type)
module.exit_json(**result)


if __name__ == '__main__':
Expand Down
Loading