Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

r/aws_lambda_event_source_mapping: add filter_criteria attribute #21937

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/21937.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_lambda_event_source_mapping: Add `filter_criteria` argument
```
145 changes: 145 additions & 0 deletions internal/service/lambda/event_source_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,30 @@ func ResourceEventSourceMapping() *schema.Resource {
ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"},
},

"filter_criteria": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"filter": {
Type: schema.TypeSet,
Optional: true,
MaxItems: 5,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"pattern": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validation.StringLenBetween(0, 4096),
},
},
},
},
},
},
},

"function_arn": {
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -315,6 +339,10 @@ func resourceEventSourceMappingCreate(d *schema.ResourceData, meta interface{})
target = v
}

if v, ok := d.GetOk("filter_criteria"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.FilterCriteria = expandLambdaFilterCriteria(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("function_response_types"); ok && v.(*schema.Set).Len() > 0 {
input.FunctionResponseTypes = flex.ExpandStringSet(v.(*schema.Set))
}
Expand Down Expand Up @@ -442,6 +470,13 @@ func resourceEventSourceMappingRead(d *schema.ResourceData, meta interface{}) er
d.Set("destination_config", nil)
}
d.Set("event_source_arn", eventSourceMappingConfiguration.EventSourceArn)
if v := eventSourceMappingConfiguration.FilterCriteria; v != nil {
if err := d.Set("filter_criteria", []interface{}{flattenLambdaFilterCriteria(v)}); err != nil {
return fmt.Errorf("error setting filter criteria: %w", err)
}
} else {
d.Set("filter_criteria", nil)
}
d.Set("function_arn", eventSourceMappingConfiguration.FunctionArn)
d.Set("function_name", eventSourceMappingConfiguration.FunctionArn)
d.Set("function_response_types", aws.StringValueSlice(eventSourceMappingConfiguration.FunctionResponseTypes))
Expand Down Expand Up @@ -518,6 +553,15 @@ func resourceEventSourceMappingUpdate(d *schema.ResourceData, meta interface{})
input.Enabled = aws.Bool(d.Get("enabled").(bool))
}

if d.HasChange("filter_criteria") {
if v, ok := d.GetOk("filter_criteria"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.FilterCriteria = expandLambdaFilterCriteria(v.([]interface{})[0].(map[string]interface{}))
} else {
// AWS ignores the removal if this is left as nil.
input.FilterCriteria = &lambda.FilterCriteria{}
}
}

if d.HasChange("function_name") {
input.FunctionName = aws.String(d.Get("function_name").(string))
}
Expand Down Expand Up @@ -798,3 +842,104 @@ func flattenLambdaSourceAccessConfigurations(apiObjects []*lambda.SourceAccessCo

return tfList
}

func expandLambdaFilterCriteria(tfMap map[string]interface{}) *lambda.FilterCriteria {
if tfMap == nil {
return nil
}

apiObject := &lambda.FilterCriteria{}

if v, ok := tfMap["filter"].(*schema.Set); ok && v.Len() > 0 {
apiObject.Filters = expandLambdaFilters(v.List())
}

return apiObject
}

func flattenLambdaFilterCriteria(apiObject *lambda.FilterCriteria) map[string]interface{} {
if apiObject == nil {
return nil
}

tfMap := map[string]interface{}{}

if v := apiObject.Filters; len(v) > 0 {
tfMap["filter"] = flattenLambdaFilters(v)
}

return tfMap
}

func expandLambdaFilters(tfList []interface{}) []*lambda.Filter {
if len(tfList) == 0 {
return nil
}

var apiObjects []*lambda.Filter

for _, tfMapRaw := range tfList {
tfMap, ok := tfMapRaw.(map[string]interface{})

if !ok {
continue
}

apiObject := expandLambdaFilter(tfMap)

if apiObject == nil {
continue
}

apiObjects = append(apiObjects, apiObject)
}

return apiObjects
}

func flattenLambdaFilters(apiObjects []*lambda.Filter) []interface{} {
if len(apiObjects) == 0 {
return nil
}

var tfList []interface{}

for _, apiObject := range apiObjects {
if apiObject == nil {
continue
}

tfList = append(tfList, flattenLambdaFilter(apiObject))
}

return tfList
}

func expandLambdaFilter(tfMap map[string]interface{}) *lambda.Filter {
if tfMap == nil {
return nil
}

apiObject := &lambda.Filter{}

if v, ok := tfMap["pattern"].(string); ok {
// The API permits patterns of length >= 0, so accept the empty string.
apiObject.Pattern = aws.String(v)
}

return apiObject
}

func flattenLambdaFilter(apiObject *lambda.Filter) map[string]interface{} {
if apiObject == nil {
return nil
}

tfMap := map[string]interface{}{}

if v := apiObject.Pattern; v != nil {
tfMap["pattern"] = aws.StringValue(v)
}

return tfMap
}
114 changes: 114 additions & 0 deletions internal/service/lambda/event_source_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestAccLambdaEventSourceMapping_SQS_basic(t *testing.T) {
resource.TestCheckResourceAttrPair(resourceName, "function_name", functionResourceName, "arn"),
resource.TestCheckResourceAttrPair(resourceName, "function_arn", functionResourceName, "arn"),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.#", "0"),
),
},
// batch_size became optional. Ensure that if the user supplies the default
Expand Down Expand Up @@ -876,6 +877,76 @@ func TestAccLambdaEventSourceMapping_rabbitMQ(t *testing.T) {
})
}

func TestAccLambdaEventSourceMapping_SQS_filterCriteria(t *testing.T) {
var conf lambda.EventSourceMappingConfiguration
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_lambda_event_source_mapping.test"
pattern1 := "{\"Region\": [{\"prefix\": \"us-\"}]}"
pattern2 := "{\"Location\": [\"New York\"], \"Day\": [\"Monday\"]}"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, lambda.EndpointsID),
Providers: acctest.Providers,
CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy,
Steps: []resource.TestStep{
{
Config: testAccEventSourceMappingSQSFilterCriteria_1(rName, pattern1),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.#", "1"),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.0.filter.#", "1"),
resource.TestCheckTypeSetElemNestedAttrs(resourceName, "filter_criteria.0.filter.*", map[string]string{"pattern": pattern1}),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
{
Config: testAccEventSourceMappingSQSFilterCriteria_2(rName, pattern1, pattern2),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.#", "1"),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.0.filter.#", "2"),
resource.TestCheckTypeSetElemNestedAttrs(resourceName, "filter_criteria.0.filter.*", map[string]string{"pattern": pattern1}),
resource.TestCheckTypeSetElemNestedAttrs(resourceName, "filter_criteria.0.filter.*", map[string]string{"pattern": pattern2}),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
{
Config: testAccEventSourceMappingSQSFilterCriteria_3(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.#", "0"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
{
Config: testAccEventSourceMappingSQSFilterCriteria_1(rName, pattern1),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.#", "1"),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.0.filter.#", "1"),
resource.TestCheckTypeSetElemNestedAttrs(resourceName, "filter_criteria.0.filter.*", map[string]string{"pattern": pattern1}),
),
},
},
})
}

func testAccCheckEventSourceMappingIsBeingDisabled(conf *lambda.EventSourceMappingConfiguration) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := acctest.Provider.Meta().(*conns.AWSClient).LambdaConn
Expand Down Expand Up @@ -1832,3 +1903,46 @@ func testAccPreCheckSecretsManager(t *testing.T) {
t.Fatalf("unexpected PreCheck error: %s", err)
}
}

func testAccEventSourceMappingSQSFilterCriteria_1(rName string, pattern1 string) string {
return acctest.ConfigCompose(testAccEventSourceMappingSQSBaseConfig(rName), fmt.Sprintf(`
resource "aws_lambda_event_source_mapping" "test" {
event_source_arn = aws_sqs_queue.test.arn
function_name = aws_lambda_function.test.arn

filter_criteria {
filter {
pattern = %q
}
}
}
`, pattern1))
}

func testAccEventSourceMappingSQSFilterCriteria_2(rName string, pattern1, pattern2 string) string {
return acctest.ConfigCompose(testAccEventSourceMappingSQSBaseConfig(rName), fmt.Sprintf(`
resource "aws_lambda_event_source_mapping" "test" {
event_source_arn = aws_sqs_queue.test.arn
function_name = aws_lambda_function.test.arn

filter_criteria {
filter {
pattern = %q
}

filter {
pattern = %q
}
}
}
`, pattern1, pattern2))
}

func testAccEventSourceMappingSQSFilterCriteria_3(rName string) string {
return acctest.ConfigCompose(testAccEventSourceMappingSQSBaseConfig(rName), `
resource "aws_lambda_event_source_mapping" "test" {
event_source_arn = aws_sqs_queue.test.arn
function_name = aws_lambda_function.test.arn
}
`)
}
29 changes: 29 additions & 0 deletions website/docs/r/lambda_event_source_mapping.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,26 @@ resource "aws_lambda_event_source_mapping" "example" {
}
```

### SQS with event filter

```terraform
resource "aws_lambda_event_source_mapping" "example" {
event_source_arn = aws_sqs_queue.sqs_queue_test.arn
function_name = aws_lambda_function.example.arn

filter_criteria {
filter {
pattern = jsonencode({
body = {
Temperature : [{ numeric : [">", 0, "<=", 100] }]
Location : ["New York"]
}
})
}
}
}
```

### Amazon MQ (ActiveMQ)

```terraform
Expand Down Expand Up @@ -132,6 +152,7 @@ resource "aws_lambda_event_source_mapping" "example" {
* `destination_config`: - (Optional) An Amazon SQS queue or Amazon SNS topic destination for failed records. Only available for stream sources (DynamoDB and Kinesis). Detailed below.
* `enabled` - (Optional) Determines if the mapping will be enabled on creation. Defaults to `true`.
* `event_source_arn` - (Optional) The event source ARN - this is required for Kinesis stream, DynamoDB stream, SQS queue, MQ broker or MSK cluster. It is incompatible with a Self Managed Kafka source.
* `filter_criteria` - (Optional) The criteria to use for [event filtering](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) Kinesis stream, DynamoDB stream, SQS queue event sources. Detailed below.
* `function_name` - (Required) The name or the ARN of the Lambda function that will be subscribing to events.
* `function_response_types` - (Optional) A list of current response type enums applied to the event source mapping for [AWS Lambda checkpointing](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting). Only available for stream sources (DynamoDB and Kinesis). Valid values: `ReportBatchItemFailures`.
* `maximum_batching_window_in_seconds` - (Optional) The maximum amount of time to gather records before invoking the function, in seconds (between 0 and 300). Records will continue to buffer (or accumulate in the case of an SQS queue event source) until either `maximum_batching_window_in_seconds` expires or `batch_size` has been met. For streaming event sources, defaults to as soon as records are available in the stream. If the batch it reads from the stream/queue only has one record in it, Lambda only sends one record to the function. Only available for stream sources (DynamoDB and Kinesis) and SQS standard queues.
Expand All @@ -154,6 +175,14 @@ resource "aws_lambda_event_source_mapping" "example" {

* `destination_arn` - (Required) The Amazon Resource Name (ARN) of the destination resource.

### filter_criteria Configuration Block

* `filter` - (Optional) A set of up to 5 filter. If an event satisfies at least one, Lambda sends the event to the function or adds it to the next batch. Detailed below.

#### filter_criteria filter Configuration Block

* `pattern` - (Optional) A filter pattern up to 4096 characters. See [Filter Rule Syntax](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax).

### self_managed_event_source Configuration Block

* `endpoints` - (Required) A map of endpoints for the self managed source. For Kafka self-managed sources, the key should be `KAFKA_BOOTSTRAP_SERVERS` and the value should be a string with a comma separated list of broker endpoints.
Expand Down