Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ tzlocal==5.0.1
cfn-lint~=0.80.3

# Type checking boto3 objects
boto3-stubs[apigateway,cloudformation,ecr,iam,lambda,s3,schemas,secretsmanager,signer,stepfunctions,sts,xray,sqs]==1.28.60
boto3-stubs[apigateway,cloudformation,ecr,iam,lambda,s3,schemas,secretsmanager,signer,stepfunctions,sts,xray,sqs,kinesis]==1.28.60
7 changes: 6 additions & 1 deletion requirements/reproducible-mac.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ boto3==1.28.60 \
# via
# aws-sam-cli (setup.py)
# aws-sam-translator
boto3-stubs[apigateway,cloudformation,ecr,iam,lambda,s3,schemas,secretsmanager,signer,sqs,stepfunctions,sts,xray]==1.28.60 \
boto3-stubs[apigateway,cloudformation,ecr,iam,lambda,s3,schemas,secretsmanager,signer,sqs,stepfunctions,sts,xray,kinesis]==1.28.60 \
--hash=sha256:1586dfac8f92c2c03237d99c2a412f17241084d39e89d2baa8c51cf3cb38082c \
--hash=sha256:dcadcae758b170d8fa0e6aee7b768c57686ebca93093f79e3ece6663cc3cc681
# via aws-sam-cli (setup.py)
Expand Down Expand Up @@ -415,6 +415,10 @@ mypy-boto3-iam==1.28.37 \
--hash=sha256:39bd5b8b9a48cb47d909d45c13c713c099c2f84719612a0a848d7a0497c6fcf4 \
--hash=sha256:a5ed8c70c610f2ae7ee4a32ecf87c42824bfeb1e8a8c6347d888c73383d2c61f
# via boto3-stubs
mypy-boto3-kinesis==1.28.36 \
--hash=sha256:43713c0ce8f63b2cbd181132e71371031bd90eac250aef7357b239d4da6b8504 \
--hash=sha256:ab37194c4f69fead34f1ba12dfa520ad6c1aed9236316df453e9f3c873f2420a
# via boto3-stubs
mypy-boto3-lambda==1.28.36 \
--hash=sha256:70498e6ff6bfd60b758553d27fadf691ba169572faca01c2bd457da0b48b9cff \
--hash=sha256:edb1f49279f7713929a70eaab00cf3d4ba65a10016db636805d022b2eaf14c84
Expand Down Expand Up @@ -958,6 +962,7 @@ typing-extensions==4.8.0 \
# mypy-boto3-cloudformation
# mypy-boto3-ecr
# mypy-boto3-iam
# mypy-boto3-kinesis
# mypy-boto3-lambda
# mypy-boto3-s3
# mypy-boto3-schemas
Expand Down
7 changes: 6 additions & 1 deletion samcli/commands/remote/remote_invoke_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
get_resource_summary,
get_resource_summary_from_physical_id,
)
from samcli.lib.utils.resources import AWS_LAMBDA_FUNCTION, AWS_SQS_QUEUE
from samcli.lib.utils.resources import AWS_KINESIS_STREAM, AWS_LAMBDA_FUNCTION, AWS_SQS_QUEUE
from samcli.lib.utils.stream_writer import StreamWriter

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -215,6 +215,11 @@ def _get_from_physical_resource_id(self) -> CloudFormationResourceSummary:
sqs_client = self._boto_client_provider("sqs")
resource_id = get_queue_url_from_arn(sqs_client, resource_arn.resource_id)

if SUPPORTED_SERVICES.get(service_from_arn) == AWS_KINESIS_STREAM:
# Note (hnnasit): Add unit test after AWS_KINESIS_STREAM is added to SUPPORTED_SERVICES
# StreamName extraced from arn is used as resource_id.
resource_id = resource_arn.resource_id

return CloudFormationResourceSummary(
cast(str, SUPPORTED_SERVICES.get(service_from_arn)),
resource_id,
Expand Down
132 changes: 132 additions & 0 deletions samcli/lib/remote_invoke/kinesis_invoke_executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
Remote invoke executor implementation for Kinesis streams
"""
import logging
import uuid
from dataclasses import asdict, dataclass
from typing import cast

from botocore.exceptions import ClientError, ParamValidationError
from mypy_boto3_kinesis import KinesisClient

from samcli.lib.remote_invoke.exceptions import (
ErrorBotoApiCallException,
InvalidResourceBotoParameterException,
)
from samcli.lib.remote_invoke.remote_invoke_executors import (
BotoActionExecutor,
RemoteInvokeIterableResponseType,
RemoteInvokeOutputFormat,
RemoteInvokeResponse,
)

LOG = logging.getLogger(__name__)
STREAM_NAME = "StreamName"
DATA = "Data"
PARTITION_KEY = "PartitionKey"


@dataclass
class KinesisStreamPutRecordTextOutput:
"""
Dataclass that stores put_record boto3 API fields used to create
text output.
"""

ShardId: str
SequenceNumber: str

def get_output_response_dict(self) -> dict:
"""
Returns a dict of existing dataclass fields.
Returns
-------
dict
Returns the dict of the fields that will be used as the output response for
text format output.
"""
return asdict(self, dict_factory=lambda x: {k: v for (k, v) in x if v is not None})


class KinesisPutDataExecutor(BotoActionExecutor):
"""
Calls "put_record" method of "Kinesis stream" service with given input.
If a file location provided, the file handle will be passed as input object.
"""

_kinesis_client: KinesisClient
_stream_name: str
_remote_output_format: RemoteInvokeOutputFormat
request_parameters: dict

def __init__(self, kinesis_client: KinesisClient, physical_id: str, remote_output_format: RemoteInvokeOutputFormat):
self._kinesis_client = kinesis_client
self._remote_output_format = remote_output_format
self._stream_name = physical_id
self.request_parameters = {}

def validate_action_parameters(self, parameters: dict) -> None:
"""
Validates the input boto parameters and prepares the parameters for calling the API.
Parameters
----------
parameters: dict
Boto parameters provided as input
"""
for parameter_key, parameter_value in parameters.items():
if parameter_key == STREAM_NAME:
LOG.warning("StreamName is defined using the value provided for resource_id argument.")
elif parameter_key == DATA:
LOG.warning("Data is defined using the value provided for either --event or --event-file options.")
else:
self.request_parameters[parameter_key] = parameter_value

if PARTITION_KEY not in self.request_parameters:
self.request_parameters[PARTITION_KEY] = str(uuid.uuid4())

def _execute_action(self, payload: str) -> RemoteInvokeIterableResponseType:
"""
Calls "put_record" method to write single data record to Kinesis data stream.
Parameters
----------
payload: str
The Data record which will be sent to the Kinesis stream
Yields
------
RemoteInvokeIterableResponseType
Response that is consumed by remote invoke consumers after execution
"""
if payload:
self.request_parameters[DATA] = payload
else:
self.request_parameters[DATA] = "{}"
LOG.debug("Input event not found, putting a record with Data {}")
self.request_parameters[STREAM_NAME] = self._stream_name
LOG.debug(
"Calling kinesis_client.put_record with StreamName:%s, Data:%s",
self.request_parameters[STREAM_NAME],
payload,
)
try:
put_record_response = cast(dict, self._kinesis_client.put_record(**self.request_parameters))

if self._remote_output_format == RemoteInvokeOutputFormat.JSON:
yield RemoteInvokeResponse(put_record_response)
if self._remote_output_format == RemoteInvokeOutputFormat.TEXT:
put_record_text_output = KinesisStreamPutRecordTextOutput(
ShardId=put_record_response["ShardId"],
SequenceNumber=put_record_response["SequenceNumber"],
)
output_data = put_record_text_output.get_output_response_dict()
yield RemoteInvokeResponse(output_data)
except ParamValidationError as param_val_ex:
raise InvalidResourceBotoParameterException(
f"Invalid parameter key provided."
f" {str(param_val_ex).replace(f'{STREAM_NAME}, ', '').replace(f'{DATA}, ', '')}"
)
except ClientError as client_ex:
raise ErrorBotoApiCallException(client_ex) from client_ex
39 changes: 39 additions & 0 deletions samcli/lib/remote_invoke/remote_invoke_executor_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from typing import Any, Callable, Dict, Optional

from samcli.lib.remote_invoke.kinesis_invoke_executors import KinesisPutDataExecutor
from samcli.lib.remote_invoke.lambda_invoke_executors import (
DefaultConvertToJSON,
LambdaInvokeExecutor,
Expand Down Expand Up @@ -224,6 +225,44 @@ def _create_sqs_boto_executor(
log_consumer=log_consumer,
)

def _create_kinesis_boto_executor(
self,
cfn_resource_summary: CloudFormationResourceSummary,
remote_invoke_output_format: RemoteInvokeOutputFormat,
response_consumer: RemoteInvokeConsumer[RemoteInvokeResponse],
log_consumer: RemoteInvokeConsumer[RemoteInvokeLogOutput],
) -> RemoteInvokeExecutor:
"""Creates a remote invoke executor for Kinesis resource type based on
the boto action being called.

Parameters
----------
cfn_resource_summary: CloudFormationResourceSummary
Information about the Kinesis stream resource
remote_invoke_output_format: RemoteInvokeOutputFormat
Response output format that will be used for remote invoke execution
response_consumer: RemoteInvokeConsumer[RemoteInvokeResponse]
Consumer instance which can process RemoteInvokeResponse events
log_consumer: RemoteInvokeConsumer[RemoteInvokeLogOutput]
Consumer instance which can process RemoteInvokeLogOutput events

Returns
-------
RemoteInvokeExecutor
Returns the Executor created for Kinesis stream
"""
LOG.info("Putting record to Kinesis data stream %s", cfn_resource_summary.logical_resource_id)
kinesis_client = self._boto_client_provider("kinesis")
return RemoteInvokeExecutor(
request_mappers=[DefaultConvertToJSON()],
response_mappers=[ResponseObjectToJsonStringMapper()],
boto_action_executor=KinesisPutDataExecutor(
kinesis_client, cfn_resource_summary.physical_resource_id, remote_invoke_output_format
),
response_consumer=response_consumer,
log_consumer=log_consumer,
)

# mapping definition for each supported resource type
REMOTE_INVOKE_EXECUTOR_MAPPING: Dict[
str,
Expand Down
142 changes: 142 additions & 0 deletions tests/unit/lib/remote_invoke/test_kinesis_invoke_executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from unittest import TestCase
from unittest.mock import patch, Mock

from parameterized import parameterized, parameterized_class
from samcli.lib.remote_invoke.kinesis_invoke_executors import (
RemoteInvokeOutputFormat,
KinesisPutDataExecutor,
ParamValidationError,
InvalidResourceBotoParameterException,
ErrorBotoApiCallException,
ClientError,
KinesisStreamPutRecordTextOutput,
)
from samcli.lib.remote_invoke.remote_invoke_executors import RemoteInvokeResponse


class TestKinesisStreamPutRecordTextOutput(TestCase):
@parameterized.expand(
[
("mock-shard-id", "mock-sequence-number"),
]
)
def test_kinesis_put_record_text_output(self, shard_id, sequence_number):
text_output = KinesisStreamPutRecordTextOutput(ShardId=shard_id, SequenceNumber=sequence_number)
self.assertEqual(text_output.ShardId, shard_id)
self.assertEqual(text_output.SequenceNumber, sequence_number)

@parameterized.expand(
[
(
"mock-shard-id",
"mock-sequence-number",
{
"ShardId": "mock-shard-id",
"SequenceNumber": "mock-sequence-number",
},
),
]
)
def test_get_output_response_dict(self, shard_id, sequence_number, expected_output):
text_output = KinesisStreamPutRecordTextOutput(ShardId=shard_id, SequenceNumber=sequence_number)
output_response_dict = text_output.get_output_response_dict()
self.assertEqual(output_response_dict, expected_output)


@parameterized_class(
"output",
[[RemoteInvokeOutputFormat.TEXT], [RemoteInvokeOutputFormat.JSON]],
)
class TestKinesisPutDataExecutor(TestCase):
output: RemoteInvokeOutputFormat

def setUp(self) -> None:
self.kinesis_client = Mock()
self.stream_name = "mock-kinesis-stream"
self.kinesis_put_data_executor = KinesisPutDataExecutor(self.kinesis_client, self.stream_name, self.output)

@patch("samcli.lib.remote_invoke.kinesis_invoke_executors.uuid")
def test_execute_action_successful(self, patched_uuid):
mock_uuid_value = "patched-uuid-value"
patched_uuid.uuid4.return_value = mock_uuid_value
given_input_data = "hello world"
mock_shard_id = "shardId-000000000000"
mock_sequence_number = "2941492a-5847-4ebb-a8a3-58c07ce9f198"
mock_text_response = {
"ShardId": mock_shard_id,
"SequenceNumber": mock_sequence_number,
}

mock_json_response = {
"ShardId": mock_shard_id,
"SequenceNumber": mock_sequence_number,
"ResponseMetadata": {},
}
self.kinesis_client.put_record.return_value = {
"ShardId": mock_shard_id,
"SequenceNumber": mock_sequence_number,
"ResponseMetadata": {},
}
self.kinesis_put_data_executor.validate_action_parameters({})
result = self.kinesis_put_data_executor._execute_action(given_input_data)
if self.output == RemoteInvokeOutputFormat.JSON:
self.assertEqual(list(result), [RemoteInvokeResponse(mock_json_response)])
else:
self.assertEqual(list(result), [RemoteInvokeResponse(mock_text_response)])

self.kinesis_client.put_record.assert_called_with(
Data=given_input_data, StreamName=self.stream_name, PartitionKey=mock_uuid_value
)

@parameterized.expand(
[
({}, {"PartitionKey": "mock-uuid-value"}),
(
{"ExplicitHashKey": "mock-explicit-hash-key", "SequenceNumberForOrdering": "1"},
{
"PartitionKey": "mock-uuid-value",
"ExplicitHashKey": "mock-explicit-hash-key",
"SequenceNumberForOrdering": "1",
},
),
(
{
"PartitionKey": "override-partition-key",
},
{
"PartitionKey": "override-partition-key",
},
),
(
{"StreamName": "mock-stream-name", "Data": "mock-data"},
{"PartitionKey": "mock-uuid-value"},
),
(
{"invalidParameterKey": "invalidParameterValue"},
{"invalidParameterKey": "invalidParameterValue", "PartitionKey": "mock-uuid-value"},
),
]
)
@patch("samcli.lib.remote_invoke.kinesis_invoke_executors.uuid")
def test_validate_action_parameters(self, parameters, expected_boto_parameters, patched_uuid):
mock_uuid_value = "mock-uuid-value"
patched_uuid.uuid4.return_value = mock_uuid_value
self.kinesis_put_data_executor.validate_action_parameters(parameters)
self.assertEqual(self.kinesis_put_data_executor.request_parameters, expected_boto_parameters)

@parameterized.expand(
[
(ParamValidationError(report="Invalid parameters"), InvalidResourceBotoParameterException),
(
ClientError(error_response={"Error": {"Code": "MockException"}}, operation_name="send_message"),
ErrorBotoApiCallException,
),
]
)
def test_execute_action_put_record_throws_boto_errors(self, boto_error, expected_error_thrown):
given_input_message = "hello world"
self.kinesis_client.put_record.side_effect = boto_error
with self.assertRaises(expected_error_thrown):
self.kinesis_put_data_executor.validate_action_parameters({})
for _ in self.kinesis_put_data_executor._execute_action(given_input_message):
pass
Loading