Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,21 @@
2. The Lambda function invokes the CircleCI V2 API endpoint to update
environment variables, authenticating with the **CircleCI API token**
"""
import os

import boto3
import requests
from src.utils.retry import retry
from src.utils.secrets_manager_helper import retrieve_secret

CIRCLECI_URL_TEMPLATE = "https://circleci.com/api/v2/project/gh/{project_path}/envvar"
DEFAULT_REGION = "us-west-2"

# Provided by Lambda
REGION = os.environ.get("AWS_REGION", DEFAULT_REGION)


def update_environment_variables(variables: map, configuration: map, secretsmanager=None):
def update_environment_variables(variables: map, configuration: map):
"""Updates CircleCI environment variables

Args:
variables:
<list expected keys & values>
configuration:
<list expected keys & values>
secretsmanager:
(optional) reference to an AWS SecretsManager client.
Defaults to the default client for the region

Raises
KeyError: if `configuration` does not contain the expected keys
Expand All @@ -54,18 +45,10 @@ def update_environment_variables(variables: map, configuration: map, secretsmana
raise RuntimeError("Configuration is required to update CircleCI environment variables")

github_path = configuration["github_path"]
circleci_api_token_secret_arn_lambda_env_var_key = configuration[
"circleci_api_token_secret_arn_lambda_env_var_key"
circleci_api_token_secret_id_lambda_env_var_key = configuration[
"circleci_api_token_secret_id_lambda_env_var_key"
]
secret_id = os.environ.get(circleci_api_token_secret_arn_lambda_env_var_key)

if secret_id is None:
raise ValueError(
f"Lambda env var {circleci_api_token_secret_arn_lambda_env_var_key} is not set"
)

secretsmanager_client = secretsmanager or boto3.client("secretsmanager", region_name=REGION)
circleci_api_token = get_secret_value(secret_id, secretsmanager=secretsmanager_client)
circleci_api_token = retrieve_secret(circleci_api_token_secret_id_lambda_env_var_key)

for key, value in variables.items():
update_env_vars(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
from destination import circleci
from models.destination_type import DestinationType
from models.source_type import SourceType
from source_data_generator import aws_session_credential_source
from source_data_generator import (
aws_session_credential_source,
secrets_data_source,
lambda_env_var_data_source,
)


def handler(event, context, *, iam=None, sts=None, secretsmanager=None):
Expand Down Expand Up @@ -78,7 +82,7 @@ def handler(event, context, *, iam=None, sts=None, secretsmanager=None):
"type": "cci_env_variable",
"description": "Circle CI environment variable for AWS SDK iOS repo",
"github_path": "aws-amplify/aws-sdk-ios",
"circleci_api_token_secret_arn_lambda_env_var_key": "CIRCLE_CI_IOS_SDK_API_TOKEN"
"circleci_api_token_secret_id_lambda_env_var_key": "CIRCLE_CI_IOS_SDK_API_TOKEN"
}
}
}
Expand All @@ -96,19 +100,21 @@ def handler(event, context, *, iam=None, sts=None, secretsmanager=None):
destination_mapping = source["destination"]["mapping_to_destination"]
configuration = source["configuration"]

source_map = {}
if source_type == SourceType.AWS_SESSION_CREDENTIALS:
credentials = aws_session_credential_source.generate_session_credentials(configuration)
mapped_result = {}
for item in destination_mapping:
destination_key_name = item["destination_key_name"]
result_value_key = item["result_value_key"]
mapped_result[destination_key_name] = credentials[result_value_key]
source_map = aws_session_credential_source.generate_session_credentials(configuration)

elif source_type == SourceType.SECRETS_MANAGER:
mapped_result = {}
source_map = secrets_data_source.retrieve_secrets(configuration)

elif source_type == SourceType.LAMBDA_ENVIRONMENT_VARIABLE:
mapped_result = {}
source_map = lambda_env_var_data_source.retrieve_lambda_env_var_value(configuration)

mapped_result = {}
for item in destination_mapping:
destination_key_name = item["destination_key_name"]
result_value_key = item["result_value_key"]
mapped_result[destination_key_name] = source_map[result_value_key]

destination_values_map.setdefault(destination_specifier, []).append(mapped_result)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def generate_session_credentials(configuration: map, iam=None, sts=None) -> Dict
finally:
if user_credentials:
iam_client.delete_access_key(UserName=iam_username, AccessKeyId=user_credentials[0])

return session_credentials


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import os
from typing import Dict


def retrieve_lambda_env_var_value(configuration: map) -> Dict[str, str]:

if not configuration:
raise RuntimeError("Configuration is required to retrieve static data")

lambda_env_variable = configuration["lambda_env_var_key"]
static_data = os.environ.get(lambda_env_variable)
return {"result": static_data}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import json
from typing import Dict

from src.utils.secrets_manager_helper import retrieve_secret


def retrieve_secrets(configuration: map) -> Dict[str, str]:

if not configuration:
raise RuntimeError("Configuration is required to retrieve secrets")

secret_key_env_variable = configuration["secret_key_env_variable"]
secret_value = retrieve_secret(secret_key_env_variable)
try:
json_value = json.loads(secret_value)
return json_value
except (json.decoder.JSONDecodeError):
return {"result": secret_value}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import os

import boto3

DEFAULT_REGION = "us-west-2"

# Provided by Lambda
REGION = os.environ.get("AWS_REGION", DEFAULT_REGION)


def retrieve_secret(secret_id_lambda_env_var_key, secretsmanager=None):
"""Retrieve an AWS SecretsManager whose Secret Id is contained in
the specified Lambda Env Var.
"""

secretsmanager_client = secretsmanager or boto3.client("secretsmanager", region_name=REGION)
secret_id = os.environ.get(secret_id_lambda_env_var_key)

if secret_id is None:
raise ValueError(f"Lambda env var {secret_id_lambda_env_var_key} is not set")

return get_secret_value(secret_id, secretsmanager=secretsmanager_client)


def get_secret_value(secret_id: str, *, secretsmanager) -> str:
response = secretsmanager.get_secret_value(SecretId=secret_id)
secret_value = response["SecretString"]
return secret_value
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
import os
import unittest
from unittest.mock import Mock, call, patch

import botocore.session
from botocore.stub import Stubber
from src.destination import circleci

session = botocore.session.get_session()
secretsmanager = session.create_client("secretsmanager", region_name=circleci.REGION)

access_key_id = "AKIAIOSFODNN7EXAMPLE"
secret_access_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY"
session_token = (
Expand All @@ -33,24 +27,17 @@ def test_generate_credential_with_null_configuration(self):
variables=self.mock_variables(), configuration=None
)

@patch.dict(os.environ, {"CIRCLE_CI_IOS_SDK_API_TOKEN": "arn:::xxx"})
@patch("src.destination.circleci.requests.post")
def test_updates_variables(self, post):
secretsmanager_stubber = Stubber(secretsmanager)
request = {"SecretId": "arn:::xxx"}
response = {"SecretString": "SEKRET!"}
secretsmanager_stubber.add_response("get_secret_value", response, request)
@patch("src.destination.circleci.retrieve_secret")
def test_updates_variables(self, mock_retrieve_secret, post):

post.return_value = Mock()
post.return_value.status_code = 200

secretsmanager_stubber.activate()
mock_retrieve_secret.return_value = "SEKRET!"
circleci.update_environment_variables(
variables=self.mock_variables(),
configuration=self.mock_configuration(),
secretsmanager=secretsmanager,
variables=self.mock_variables(), configuration=self.mock_configuration()
)
secretsmanager_stubber.assert_no_pending_responses()

url = "https://circleci.com/api/v2/project/gh/aws-amplify/aws-sdk-ios/envvar"
header = {"Circle-Token": "SEKRET!"}
Expand All @@ -76,7 +63,7 @@ def mock_configuration(self):
"type": "cci_env_variable",
"description": "Circle CI environment variable for AWS SDK iOS repo",
"github_path": "aws-amplify/aws-sdk-ios",
"circleci_api_token_secret_arn_lambda_env_var_key": "CIRCLE_CI_IOS_SDK_API_TOKEN",
"circleci_api_token_secret_id_lambda_env_var_key": "CIRCLE_CI_IOS_SDK_API_TOKEN",
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import unittest
from unittest.mock import patch

from src.source_data_generator import secrets_data_source


class TestSecretsDataSource(unittest.TestCase):
def test_null_environment_value(self):
with self.assertRaises(RuntimeError):
secrets_data_source.retrieve_secrets(configuration=None)

@patch("src.source_data_generator.secrets_data_source.retrieve_secret")
def test_valid_result(self, mock_retrieve_secret):
mock_retrieve_secret.return_value = "SEKRET!"
configuration = {"secret_key_env_variable": "secret_key"}
result = secrets_data_source.retrieve_secrets(configuration)
self.assertIsNotNone(result)

@patch("src.source_data_generator.secrets_data_source.retrieve_secret")
def test_valid_result_string(self, mock_retrieve_secret):
mock_retrieve_secret.return_value = "SEKRET!"
configuration = {"secret_key_env_variable": "secret_key"}
result = secrets_data_source.retrieve_secrets(configuration)
self.assertTrue(isinstance(result, dict))
secret_value = result["result"]
self.assertEqual(secret_value, "SEKRET!")

@patch("src.source_data_generator.secrets_data_source.retrieve_secret")
def test_valid_result_json(self, mock_retrieve_secret):
mock_retrieve_secret.return_value = """{"GITHUB_SPM_RELEASE_USER": "user",
"GITHUB_SPM_RELEASE_TOKEN": "token"}
"""
configuration = {"secret_key_env_variable": "secret_key"}
result = secrets_data_source.retrieve_secrets(configuration)
self.assertTrue(isinstance(result, dict))

secret_value = result["GITHUB_SPM_RELEASE_USER"]
secret_token = result["GITHUB_SPM_RELEASE_TOKEN"]
self.assertEqual(secret_value, "user")
self.assertEqual(secret_token, "token")


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
import unittest
from unittest.mock import patch

import botocore.session
from botocore.stub import Stubber
from src.utils import secrets_manager_helper

session = botocore.session.get_session()
secretsmanager = session.create_client("secretsmanager", region_name=secrets_manager_helper.REGION)


class TestSecretsManagerHelper(unittest.TestCase):
def test_null_environment_value(self):
with self.assertRaises(ValueError):
secrets_manager_helper.retrieve_secret("variable")

@patch.dict(os.environ, {"variable": "some_secret_id"})
def test_retrieve_secret(self):
mock_secret = "SEKRET!"
secretsmanager_stubber = Stubber(secretsmanager)
request = {"SecretId": "some_secret_id"}
response = {"SecretString": mock_secret}
secretsmanager_stubber.add_response("get_secret_value", response, request)
secretsmanager_stubber.activate()

secret_value = secrets_manager_helper.retrieve_secret("variable", secretsmanager)

secretsmanager_stubber.assert_no_pending_responses()

self.assertEqual(mock_secret, secret_value)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os
import unittest
from unittest.mock import patch

from src.source_data_generator import lambda_env_var_data_source


class TestSecretsDataSource(unittest.TestCase):
def test_null_environment_value(self):
with self.assertRaises(RuntimeError):
lambda_env_var_data_source.retrieve_lambda_env_var_value(configuration=None)

@patch.dict(os.environ, {"variable": "SEKRET"})
def test_valid_result(self):
configuration = {"lambda_env_var_key": "variable"}
result = lambda_env_var_data_source.retrieve_lambda_env_var_value(configuration)
self.assertIsNotNone(result)

@patch.dict(os.environ, {"variable": "SEKRET!"})
def test_valid_result_string(self):
configuration = {"lambda_env_var_key": "variable"}
result = lambda_env_var_data_source.retrieve_lambda_env_var_value(configuration)
self.assertTrue(isinstance(result, dict))
secret_value = result["result"]
self.assertEqual(secret_value, "SEKRET!")


if __name__ == "__main__":
unittest.main()