diff --git a/.changelog/25684.txt b/.changelog/25684.txt new file mode 100644 index 00000000000..35ab85feb24 --- /dev/null +++ b/.changelog/25684.txt @@ -0,0 +1,3 @@ +```release-note:new-resource +aws_msk_serverless_cluster +``` \ No newline at end of file diff --git a/internal/provider/provider.go b/internal/provider/provider.go index cb645e759f2..579d8888de9 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -1620,6 +1620,7 @@ func New(_ context.Context) (*schema.Provider, error) { "aws_msk_cluster": kafka.ResourceCluster(), "aws_msk_configuration": kafka.ResourceConfiguration(), "aws_msk_scram_secret_association": kafka.ResourceScramSecretAssociation(), + "aws_msk_serverless_cluster": kafka.ResourceServerlessCluster(), "aws_mskconnect_connector": kafkaconnect.ResourceConnector(), "aws_mskconnect_custom_plugin": kafkaconnect.ResourceCustomPlugin(), diff --git a/internal/service/kafka/cluster.go b/internal/service/kafka/cluster.go index 64dace2a790..633b192d6ee 100644 --- a/internal/service/kafka/cluster.go +++ b/internal/service/kafka/cluster.go @@ -925,7 +925,7 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, meta int if d.HasChange("tags_all") { o, n := d.GetChange("tags_all") - if err := UpdateTags(conn, d.Id(), o, n); err != nil { + if err := UpdateTagsWithContext(ctx, conn, d.Id(), o, n); err != nil { return diag.Errorf("updating MSK Cluster (%s) tags: %s", d.Id(), err) } } diff --git a/internal/service/kafka/cluster_test.go b/internal/service/kafka/cluster_test.go index 5db516f6f77..4b2979013c6 100644 --- a/internal/service/kafka/cluster_test.go +++ b/internal/service/kafka/cluster_test.go @@ -110,6 +110,77 @@ func TestAccKafkaCluster_basic(t *testing.T) { }) } +func TestAccKafkaCluster_disappears(t *testing.T) { + var cluster kafka.ClusterInfo + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_msk_cluster.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) }, + ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccClusterConfig_basic(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckClusterExists(resourceName, &cluster), + acctest.CheckResourceDisappears(acctest.Provider, tfkafka.ResourceCluster(), resourceName), + ), + ExpectNonEmptyPlan: true, + }, + }, + }) +} + +func TestAccKafkaCluster_tags(t *testing.T) { + var cluster kafka.ClusterInfo + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_msk_cluster.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) }, + ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccClusterConfig_tags1(rName, "key1", "value1"), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckClusterExists(resourceName, &cluster), + resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), + resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "current_version", + }, + }, + { + Config: testAccClusterConfig_tags2(rName, "key1", "value1updated", "key2", "value2"), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckClusterExists(resourceName, &cluster), + resource.TestCheckResourceAttr(resourceName, "tags.%", "2"), + resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1updated"), + resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), + ), + }, + { + Config: testAccClusterConfig_tags1(rName, "key2", "value2"), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckClusterExists(resourceName, &cluster), + resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), + resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), + ), + }, + }, + }) +} + func TestAccKafkaCluster_BrokerNodeGroupInfo_ebsVolumeSize(t *testing.T) { var cluster1, cluster2 kafka.ClusterInfo rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) @@ -1090,54 +1161,6 @@ func TestAccKafkaCluster_kafkaVersionUpgradeWithInfo(t *testing.T) { }) } -func TestAccKafkaCluster_tags(t *testing.T) { - var cluster kafka.ClusterInfo - rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) - resourceName := "aws_msk_cluster.test" - - resource.ParallelTest(t, resource.TestCase{ - PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) }, - ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID), - ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, - CheckDestroy: testAccCheckClusterDestroy, - Steps: []resource.TestStep{ - { - Config: testAccClusterConfig_tags1(rName, "key1", "value1"), - Check: resource.ComposeAggregateTestCheckFunc( - testAccCheckClusterExists(resourceName, &cluster), - resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), - resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"), - ), - }, - { - ResourceName: resourceName, - ImportState: true, - ImportStateVerify: true, - ImportStateVerifyIgnore: []string{ - "current_version", - }, - }, - { - Config: testAccClusterConfig_tags2(rName, "key1", "value1updated", "key2", "value2"), - Check: resource.ComposeAggregateTestCheckFunc( - testAccCheckClusterExists(resourceName, &cluster), - resource.TestCheckResourceAttr(resourceName, "tags.%", "2"), - resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1updated"), - resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), - ), - }, - { - Config: testAccClusterConfig_tags1(rName, "key2", "value2"), - Check: resource.ComposeAggregateTestCheckFunc( - testAccCheckClusterExists(resourceName, &cluster), - resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), - resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), - ), - }, - }, - }) -} - func testAccCheckResourceAttrIsSortedCSV(resourceName, attributeName string) resource.TestCheckFunc { return func(s *terraform.State) error { is, err := acctest.PrimaryInstanceState(s, resourceName) diff --git a/internal/service/kafka/find.go b/internal/service/kafka/find.go index 3be69fbacb5..fd53a0cd17d 100644 --- a/internal/service/kafka/find.go +++ b/internal/service/kafka/find.go @@ -35,6 +35,31 @@ func FindClusterByARN(ctx context.Context, conn *kafka.Kafka, arn string) (*kafk return output.ClusterInfo, nil } +func findClusterV2ByARN(ctx context.Context, conn *kafka.Kafka, arn string) (*kafka.Cluster, error) { + input := &kafka.DescribeClusterV2Input{ + ClusterArn: aws.String(arn), + } + + output, err := conn.DescribeClusterV2WithContext(ctx, input) + + if tfawserr.ErrCodeEquals(err, kafka.ErrCodeNotFoundException) { + return nil, &resource.NotFoundError{ + LastError: err, + LastRequest: input, + } + } + + if err != nil { + return nil, err + } + + if output == nil || output.ClusterInfo == nil { + return nil, tfresource.NewEmptyResultError(input) + } + + return output.ClusterInfo, nil +} + func FindClusterOperationByARN(ctx context.Context, conn *kafka.Kafka, arn string) (*kafka.ClusterOperationInfo, error) { input := &kafka.DescribeClusterOperationInput{ ClusterOperationArn: aws.String(arn), @@ -102,3 +127,17 @@ func FindScramSecrets(conn *kafka.Kafka, clusterArn string) ([]*string, error) { return scramSecrets, err } + +func FindServerlessClusterByARN(ctx context.Context, conn *kafka.Kafka, arn string) (*kafka.Cluster, error) { + output, err := findClusterV2ByARN(ctx, conn, arn) + + if err != nil { + return nil, err + } + + if output.Serverless == nil { + return nil, tfresource.NewEmptyResultError(arn) + } + + return output, nil +} diff --git a/internal/service/kafka/generate.go b/internal/service/kafka/generate.go index 12118309dec..7f276140e4e 100644 --- a/internal/service/kafka/generate.go +++ b/internal/service/kafka/generate.go @@ -1,4 +1,4 @@ -//go:generate go run ../../generate/tags/main.go -ListTags -ServiceTagsMap -UpdateTags +//go:generate go run ../../generate/tags/main.go -ServiceTagsMap -UpdateTags // ONLY generate directives and package declaration! Do not add anything else to this file. package kafka diff --git a/internal/service/kafka/serverless_cluster.go b/internal/service/kafka/serverless_cluster.go new file mode 100644 index 00000000000..0a0ed896dda --- /dev/null +++ b/internal/service/kafka/serverless_cluster.go @@ -0,0 +1,369 @@ +package kafka + +import ( + "context" + "log" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kafka" + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" + "github.com/hashicorp/terraform-provider-aws/internal/conns" + "github.com/hashicorp/terraform-provider-aws/internal/flex" + tftags "github.com/hashicorp/terraform-provider-aws/internal/tags" + "github.com/hashicorp/terraform-provider-aws/internal/tfresource" + "github.com/hashicorp/terraform-provider-aws/internal/verify" +) + +func ResourceServerlessCluster() *schema.Resource { + return &schema.Resource{ + CreateWithoutTimeout: resourceServerlessClusterCreate, + ReadWithoutTimeout: resourceServerlessClusterRead, + UpdateWithoutTimeout: resourceServerlessClusterUpdate, + DeleteWithoutTimeout: resourceClusterDelete, + + Importer: &schema.ResourceImporter{ + State: schema.ImportStatePassthrough, + }, + + Timeouts: &schema.ResourceTimeout{ + Create: schema.DefaultTimeout(120 * time.Minute), + Delete: schema.DefaultTimeout(120 * time.Minute), + }, + + CustomizeDiff: verify.SetTagsDiff, + + Schema: map[string]*schema.Schema{ + "arn": { + Type: schema.TypeString, + Computed: true, + }, + "client_authentication": { + Type: schema.TypeList, + Required: true, + ForceNew: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "sasl": { + Type: schema.TypeList, + Required: true, + ForceNew: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "iam": { + Type: schema.TypeList, + Required: true, + ForceNew: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "enabled": { + Type: schema.TypeBool, + Required: true, + ForceNew: true, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + "cluster_name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: validation.StringLenBetween(1, 64), + }, + "tags": tftags.TagsSchema(), + "tags_all": tftags.TagsSchemaComputed(), + "vpc_config": { + Type: schema.TypeList, + Required: true, + ForceNew: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "security_group_ids": { + Type: schema.TypeSet, + Optional: true, + ForceNew: true, + MaxItems: 5, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + }, + "subnet_ids": { + Type: schema.TypeSet, + Required: true, + ForceNew: true, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + }, + }, + }, + }, + }, + } +} + +func resourceServerlessClusterCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + conn := meta.(*conns.AWSClient).KafkaConn + defaultTagsConfig := meta.(*conns.AWSClient).DefaultTagsConfig + tags := defaultTagsConfig.MergeTags(tftags.New(d.Get("tags").(map[string]interface{}))) + + name := d.Get("cluster_name").(string) + input := &kafka.CreateClusterV2Input{ + ClusterName: aws.String(name), + Serverless: &kafka.ServerlessRequest{ + ClientAuthentication: expandServerlessClientAuthentication(d.Get("client_authentication").([]interface{})[0].(map[string]interface{})), + VpcConfigs: expandVpcConfigs(d.Get("vpc_config").([]interface{})), + }, + Tags: Tags(tags.IgnoreAWS()), + } + + log.Printf("[DEBUG] Creating MSK Serverless Cluster: %s", input) + output, err := conn.CreateClusterV2WithContext(ctx, input) + + if err != nil { + return diag.Errorf("creating MSK Serverless Cluster (%s): %s", name, err) + } + + d.SetId(aws.StringValue(output.ClusterArn)) + + _, err = waitClusterCreated(ctx, conn, d.Id(), d.Timeout(schema.TimeoutCreate)) + + if err != nil { + return diag.Errorf("waiting for MSK Serverless Cluster (%s) create: %s", d.Id(), err) + } + + return resourceServerlessClusterRead(ctx, d, meta) +} + +func resourceServerlessClusterRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + conn := meta.(*conns.AWSClient).KafkaConn + defaultTagsConfig := meta.(*conns.AWSClient).DefaultTagsConfig + ignoreTagsConfig := meta.(*conns.AWSClient).IgnoreTagsConfig + + cluster, err := FindServerlessClusterByARN(ctx, conn, d.Id()) + + if !d.IsNewResource() && tfresource.NotFound(err) { + log.Printf("[WARN] MSK Serverless Cluster (%s) not found, removing from state", d.Id()) + d.SetId("") + return nil + } + + if err != nil { + return diag.Errorf("reading MSK Serverless Cluster (%s): %s", d.Id(), err) + } + + d.Set("arn", cluster.ClusterArn) + if cluster.Serverless.ClientAuthentication != nil { + if err := d.Set("client_authentication", []interface{}{flattenServerlessClientAuthentication(cluster.Serverless.ClientAuthentication)}); err != nil { + return diag.Errorf("setting client_authentication: %s", err) + } + } else { + d.Set("client_authentication", nil) + } + d.Set("cluster_name", cluster.ClusterName) + if err := d.Set("vpc_config", flattenVpcConfigs(cluster.Serverless.VpcConfigs)); err != nil { + return diag.Errorf("setting vpc_config: %s", err) + } + + tags := KeyValueTags(cluster.Tags).IgnoreAWS().IgnoreConfig(ignoreTagsConfig) + + //lintignore:AWSR002 + if err := d.Set("tags", tags.RemoveDefaultConfig(defaultTagsConfig).Map()); err != nil { + return diag.Errorf("setting tags: %s", err) + } + + if err := d.Set("tags_all", tags.Map()); err != nil { + return diag.Errorf("setting tags_all: %s", err) + } + + return nil +} + +func resourceServerlessClusterUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + conn := meta.(*conns.AWSClient).KafkaConn + + if d.HasChange("tags_all") { + o, n := d.GetChange("tags_all") + + if err := UpdateTagsWithContext(ctx, conn, d.Id(), o, n); err != nil { + return diag.Errorf("updating MSK Serverless Cluster (%s) tags: %s", d.Id(), err) + } + } + + return resourceServerlessClusterRead(ctx, d, meta) +} + +func expandServerlessClientAuthentication(tfMap map[string]interface{}) *kafka.ServerlessClientAuthentication { + if tfMap == nil { + return nil + } + + apiObject := &kafka.ServerlessClientAuthentication{} + + if v, ok := tfMap["sasl"].([]interface{}); ok && len(v) > 0 && v[0] != nil { + apiObject.Sasl = expandServerlessSasl(v[0].(map[string]interface{})) + } + + return apiObject +} + +func expandServerlessSasl(tfMap map[string]interface{}) *kafka.ServerlessSasl { // nosemgrep:ci.caps2-in-func-name + if tfMap == nil { + return nil + } + + apiObject := &kafka.ServerlessSasl{} + + if v, ok := tfMap["iam"].([]interface{}); ok && len(v) > 0 && v[0] != nil { + apiObject.Iam = expandIam(v[0].(map[string]interface{})) + } + + return apiObject +} + +func expandIam(tfMap map[string]interface{}) *kafka.Iam { // nosemgrep:ci.caps4-in-func-name + if tfMap == nil { + return nil + } + + apiObject := &kafka.Iam{} + + if v, ok := tfMap["enabled"].(bool); ok { + apiObject.Enabled = aws.Bool(v) + } + + return apiObject +} + +func flattenServerlessClientAuthentication(apiObject *kafka.ServerlessClientAuthentication) map[string]interface{} { + if apiObject == nil { + return nil + } + + tfMap := map[string]interface{}{} + + if v := apiObject.Sasl; v != nil { + tfMap["sasl"] = []interface{}{flattenServerlessSasl(v)} + } + + return tfMap +} + +func flattenServerlessSasl(apiObject *kafka.ServerlessSasl) map[string]interface{} { // nosemgrep:ci.caps2-in-func-name + if apiObject == nil { + return nil + } + + tfMap := map[string]interface{}{} + + if v := apiObject.Iam; v != nil { + tfMap["iam"] = []interface{}{flattenIam(v)} + } + + return tfMap +} + +func flattenIam(apiObject *kafka.Iam) map[string]interface{} { // nosemgrep:ci.caps4-in-func-name + if apiObject == nil { + return nil + } + + tfMap := map[string]interface{}{} + + if v := apiObject.Enabled; v != nil { + tfMap["enabled"] = aws.BoolValue(v) + } + + return tfMap +} + +func expandVpcConfig(tfMap map[string]interface{}) *kafka.VpcConfig { // nosemgrep:ci.caps5-in-func-name + if tfMap == nil { + return nil + } + + apiObject := &kafka.VpcConfig{} + + if v, ok := tfMap["security_group_ids"].(*schema.Set); ok && v.Len() > 0 { + apiObject.SecurityGroupIds = flex.ExpandStringSet(v) + } + + if v, ok := tfMap["subnet_ids"].(*schema.Set); ok && v.Len() > 0 { + apiObject.SubnetIds = flex.ExpandStringSet(v) + } + + return apiObject +} + +func expandVpcConfigs(tfList []interface{}) []*kafka.VpcConfig { // nosemgrep:ci.caps5-in-func-name + if len(tfList) == 0 { + return nil + } + + var apiObjects []*kafka.VpcConfig + + for _, tfMapRaw := range tfList { + tfMap, ok := tfMapRaw.(map[string]interface{}) + + if !ok { + continue + } + + apiObject := expandVpcConfig(tfMap) + + if apiObject == nil { + continue + } + + apiObjects = append(apiObjects, apiObject) + } + + return apiObjects +} + +func flattenVpcConfig(apiObject *kafka.VpcConfig) map[string]interface{} { // nosemgrep:ci.caps5-in-func-name + if apiObject == nil { + return nil + } + + tfMap := map[string]interface{}{} + + if v := apiObject.SecurityGroupIds; v != nil { + tfMap["security_group_ids"] = aws.StringValueSlice(v) + } + + if v := apiObject.SubnetIds; v != nil { + tfMap["subnet_ids"] = aws.StringValueSlice(v) + } + + return tfMap +} + +func flattenVpcConfigs(apiObjects []*kafka.VpcConfig) []interface{} { // nosemgrep:ci.caps5-in-func-name + if len(apiObjects) == 0 { + return nil + } + + var tfList []interface{} + + for _, apiObject := range apiObjects { + if apiObject == nil { + continue + } + + tfList = append(tfList, flattenVpcConfig(apiObject)) + } + + return tfList +} diff --git a/internal/service/kafka/serverless_cluster_test.go b/internal/service/kafka/serverless_cluster_test.go new file mode 100644 index 00000000000..7d8dd8c7080 --- /dev/null +++ b/internal/service/kafka/serverless_cluster_test.go @@ -0,0 +1,323 @@ +package kafka_test + +import ( + "context" + "fmt" + "regexp" + "testing" + + "github.com/aws/aws-sdk-go/service/kafka" + sdkacctest "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" + "github.com/hashicorp/terraform-provider-aws/internal/acctest" + "github.com/hashicorp/terraform-provider-aws/internal/conns" + tfkafka "github.com/hashicorp/terraform-provider-aws/internal/service/kafka" + "github.com/hashicorp/terraform-provider-aws/internal/tfresource" +) + +func TestAccKafkaServerlessCluster_basic(t *testing.T) { + var v kafka.Cluster + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_msk_serverless_cluster.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) }, + ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckServerlessClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccServerlessClusterConfig_basic(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckServerlessClusterExists(resourceName, &v), + acctest.MatchResourceAttrRegionalARN(resourceName, "arn", "kafka", regexp.MustCompile(`cluster/.+$`)), + resource.TestCheckResourceAttr(resourceName, "cluster_name", rName), + resource.TestCheckResourceAttr(resourceName, "client_authentication.#", "1"), + resource.TestCheckResourceAttr(resourceName, "client_authentication.0.sasl.#", "1"), + resource.TestCheckResourceAttr(resourceName, "client_authentication.0.sasl.0.iam.#", "1"), + resource.TestCheckResourceAttr(resourceName, "client_authentication.0.sasl.0.iam.0.enabled", "true"), + resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), + resource.TestCheckResourceAttr(resourceName, "vpc_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "vpc_config.0.security_group_ids.#", "0"), + resource.TestCheckResourceAttr(resourceName, "vpc_config.0.subnet_ids.#", "2"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func TestAccKafkaServerlessCluster_disappears(t *testing.T) { + var v kafka.Cluster + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_msk_serverless_cluster.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) }, + ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckServerlessClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccServerlessClusterConfig_basic(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckServerlessClusterExists(resourceName, &v), + acctest.CheckResourceDisappears(acctest.Provider, tfkafka.ResourceServerlessCluster(), resourceName), + ), + ExpectNonEmptyPlan: true, + }, + }, + }) +} + +func TestAccKafkaServerlessCluster_tags(t *testing.T) { + var v kafka.Cluster + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_msk_serverless_cluster.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) }, + ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckServerlessClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccServerlessClusterConfig_tags1(rName, "key1", "value1"), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckServerlessClusterExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), + resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccServerlessClusterConfig_tags2(rName, "key1", "value1updated", "key2", "value2"), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckServerlessClusterExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "tags.%", "2"), + resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1updated"), + resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), + ), + }, + { + Config: testAccServerlessClusterConfig_tags1(rName, "key2", "value2"), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckServerlessClusterExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), + resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), + ), + }, + }, + }) +} + +func TestAccKafkaServerlessCluster_securityGroup(t *testing.T) { + var v kafka.Cluster + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_msk_serverless_cluster.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) }, + ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckServerlessClusterDestroy, + Steps: []resource.TestStep{ + { + Config: testAccServerlessClusterConfig_securityGroup(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckServerlessClusterExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "vpc_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "vpc_config.0.security_group_ids.#", "1"), + resource.TestCheckResourceAttr(resourceName, "vpc_config.0.subnet_ids.#", "2"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func testAccCheckServerlessClusterDestroy(s *terraform.State) error { + conn := acctest.Provider.Meta().(*conns.AWSClient).KafkaConn + + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_msk_serverless_cluster" { + continue + } + + _, err := tfkafka.FindServerlessClusterByARN(context.Background(), conn, rs.Primary.ID) + + if tfresource.NotFound(err) { + continue + } + + if err != nil { + return err + } + + return fmt.Errorf("MSK Serverless Cluster %s still exists", rs.Primary.ID) + } + + return nil +} + +func testAccCheckServerlessClusterExists(n string, v *kafka.Cluster) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Not found: %s", n) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("No MSK Serverless Cluster ID is set") + } + + conn := acctest.Provider.Meta().(*conns.AWSClient).KafkaConn + + output, err := tfkafka.FindServerlessClusterByARN(context.Background(), conn, rs.Primary.ID) + + if err != nil { + return err + } + + *v = *output + + return nil + } +} + +func testAccServerlessClusterConfig_base(rName string) string { + return acctest.ConfigCompose(acctest.ConfigAvailableAZsNoOptIn(), fmt.Sprintf(` +resource "aws_vpc" "test" { + cidr_block = "10.0.0.0/16" + + enable_dns_hostnames = true + + tags = { + Name = %[1]q + } +} + +resource "aws_subnet" "test" { + count = 2 + + vpc_id = aws_vpc.test.id + availability_zone = data.aws_availability_zones.available.names[count.index] + cidr_block = cidrsubnet(aws_vpc.test.cidr_block, 8, count.index) + + tags = { + Name = %[1]q + } +} +`, rName)) +} + +func testAccServerlessClusterConfig_basic(rName string) string { + return acctest.ConfigCompose(testAccServerlessClusterConfig_base(rName), fmt.Sprintf(` +resource "aws_msk_serverless_cluster" "test" { + cluster_name = %[1]q + + client_authentication { + sasl { + iam { + enabled = true + } + } + } + + vpc_config { + subnet_ids = aws_subnet.test[*].id + } +} +`, rName)) +} + +func testAccServerlessClusterConfig_tags1(rName, tagKey1, tagValue1 string) string { + return acctest.ConfigCompose(testAccServerlessClusterConfig_base(rName), fmt.Sprintf(` +resource "aws_msk_serverless_cluster" "test" { + cluster_name = %[1]q + + client_authentication { + sasl { + iam { + enabled = true + } + } + } + + vpc_config { + subnet_ids = aws_subnet.test[*].id + } + + tags = { + %[2]q = %[3]q + } +} +`, rName, tagKey1, tagValue1)) +} + +func testAccServerlessClusterConfig_tags2(rName, tagKey1, tagValue1, tagKey2, tagValue2 string) string { + return acctest.ConfigCompose(testAccServerlessClusterConfig_base(rName), fmt.Sprintf(` +resource "aws_msk_serverless_cluster" "test" { + cluster_name = %[1]q + + client_authentication { + sasl { + iam { + enabled = true + } + } + } + + vpc_config { + subnet_ids = aws_subnet.test[*].id + } + + tags = { + %[2]q = %[3]q + %[4]q = %[5]q + } +} +`, rName, tagKey1, tagValue1, tagKey2, tagValue2)) +} + +func testAccServerlessClusterConfig_securityGroup(rName string) string { + return acctest.ConfigCompose(testAccServerlessClusterConfig_base(rName), fmt.Sprintf(` +resource "aws_security_group" "test" { + vpc_id = aws_vpc.test.id + + tags = { + Name = %[1]q + } +} + +resource "aws_msk_serverless_cluster" "test" { + cluster_name = %[1]q + + client_authentication { + sasl { + iam { + enabled = true + } + } + } + + vpc_config { + security_group_ids = [aws_security_group.test.id] + subnet_ids = aws_subnet.test[*].id + } +} +`, rName)) +} diff --git a/internal/service/kafka/status.go b/internal/service/kafka/status.go index 76482192dcb..67057783c82 100644 --- a/internal/service/kafka/status.go +++ b/internal/service/kafka/status.go @@ -11,7 +11,7 @@ import ( func statusClusterState(ctx context.Context, conn *kafka.Kafka, arn string) resource.StateRefreshFunc { return func() (interface{}, string, error) { - output, err := FindClusterByARN(ctx, conn, arn) + output, err := findClusterV2ByARN(ctx, conn, arn) if tfresource.NotFound(err) { return nil, "", nil diff --git a/internal/service/kafka/sweep.go b/internal/service/kafka/sweep.go index 87f1cd224f4..b49a13110f8 100644 --- a/internal/service/kafka/sweep.go +++ b/internal/service/kafka/sweep.go @@ -39,18 +39,18 @@ func sweepClusters(region string) error { return fmt.Errorf("error getting client: %s", err) } conn := client.(*conns.AWSClient).KafkaConn - input := &kafka.ListClustersInput{} + input := &kafka.ListClustersV2Input{} sweepResources := make([]*sweep.SweepResource, 0) - err = conn.ListClustersPages(input, func(page *kafka.ListClustersOutput, lastPage bool) bool { + err = conn.ListClustersV2Pages(input, func(page *kafka.ListClustersV2Output, lastPage bool) bool { if page == nil { return !lastPage } - for _, cluster := range page.ClusterInfoList { + for _, v := range page.ClusterInfoList { r := ResourceCluster() d := r.Data(nil) - d.SetId(aws.StringValue(cluster.ClusterArn)) + d.SetId(aws.StringValue(v.ClusterArn)) sweepResources = append(sweepResources, sweep.NewSweepResource(r, d, client)) } diff --git a/internal/service/kafka/tags_gen.go b/internal/service/kafka/tags_gen.go index e82935b2388..d573939f58b 100644 --- a/internal/service/kafka/tags_gen.go +++ b/internal/service/kafka/tags_gen.go @@ -11,27 +11,6 @@ import ( tftags "github.com/hashicorp/terraform-provider-aws/internal/tags" ) -// ListTags lists kafka service tags. -// The identifier is typically the Amazon Resource Name (ARN), although -// it may also be a different identifier depending on the service. -func ListTags(conn kafkaiface.KafkaAPI, identifier string) (tftags.KeyValueTags, error) { - return ListTagsWithContext(context.Background(), conn, identifier) -} - -func ListTagsWithContext(ctx context.Context, conn kafkaiface.KafkaAPI, identifier string) (tftags.KeyValueTags, error) { - input := &kafka.ListTagsForResourceInput{ - ResourceArn: aws.String(identifier), - } - - output, err := conn.ListTagsForResourceWithContext(ctx, input) - - if err != nil { - return tftags.New(nil), err - } - - return KeyValueTags(output.Tags), nil -} - // map[string]*string handling // Tags returns kafka service tags. diff --git a/internal/service/kafka/wait.go b/internal/service/kafka/wait.go index 8795329f560..4aed6c34fa4 100644 --- a/internal/service/kafka/wait.go +++ b/internal/service/kafka/wait.go @@ -15,7 +15,7 @@ const ( configurationDeletedTimeout = 5 * time.Minute ) -func waitClusterCreated(ctx context.Context, conn *kafka.Kafka, arn string, timeout time.Duration) (*kafka.ClusterInfo, error) { +func waitClusterCreated(ctx context.Context, conn *kafka.Kafka, arn string, timeout time.Duration) (*kafka.ClusterInfo, error) { //nolint:unparam stateConf := &resource.StateChangeConf{ Pending: []string{kafka.ClusterStateCreating}, Target: []string{kafka.ClusterStateActive}, diff --git a/website/docs/d/msk_cluster.html.markdown b/website/docs/d/msk_cluster.html.markdown index e02f9c3bc33..78ae9d6936e 100644 --- a/website/docs/d/msk_cluster.html.markdown +++ b/website/docs/d/msk_cluster.html.markdown @@ -10,6 +10,8 @@ description: |- Get information on an Amazon MSK Cluster. +-> **Note:** This data sources returns information on _provisioned_ clusters. + ## Example Usage ```terraform diff --git a/website/docs/r/msk_cluster.html.markdown b/website/docs/r/msk_cluster.html.markdown index 636cef1514a..e6e46f34595 100644 --- a/website/docs/r/msk_cluster.html.markdown +++ b/website/docs/r/msk_cluster.html.markdown @@ -3,12 +3,14 @@ subcategory: "Managed Streaming for Kafka" layout: "aws" page_title: "AWS: aws_msk_cluster" description: |- - Terraform resource for managing an AWS Managed Streaming for Kafka cluster + Terraform resource for managing an AWS Managed Streaming for Kafka cluster. --- # Resource: aws_msk_cluster -Manages AWS Managed Streaming for Kafka cluster +Manages an Amazon MSK cluster. + +-> **Note:** This resource manages _provisioned_ clusters. To manage a _serverless_ Amazon MSK cluster, use the [`aws_msk_serverless_cluster`](/docs/providers/aws/r/msk_serverless_cluster.html) resource. ## Example Usage diff --git a/website/docs/r/msk_serverless_cluster.html.markdown b/website/docs/r/msk_serverless_cluster.html.markdown new file mode 100644 index 00000000000..a8494b98f2c --- /dev/null +++ b/website/docs/r/msk_serverless_cluster.html.markdown @@ -0,0 +1,82 @@ +--- +subcategory: "Managed Streaming for Kafka" +layout: "aws" +page_title: "AWS: aws_msk_serverless_cluster" +description: |- + Terraform resource for managing an Amazon MSK Serverless cluster. +--- + +# Resource: aws_msk_serverless_cluster + +Manages an Amazon MSK Serverless cluster. + +-> **Note:** To manage a _provisioned_ Amazon MSK cluster, use the [`aws_msk_cluster`](/docs/providers/aws/r/msk_cluster.html) resource. + +## Example Usage + +```terraform +resource "aws_msk_serverless_cluster" "example" { + cluster_name = "Example" + + vpc_config { + subnet_ids = aws_subnet.example[*].id + security_group_ids = [aws_security_group.example.id] + } + + client_authentication { + sasl { + iam { + enabled = true + } + } + } +} +``` + +## Argument Reference + +The following arguments are supported: + +* `client_authentication` - (Required) Specifies client authentication information for the serverless cluster. See below. +* `cluster_name` - (Required) The name of the serverless cluster. +* `tags` - (Optional) A map of tags to assign to the resource. If configured with a provider [`default_tags` configuration block](https://registry.terraform.io/providers/hashicorp/aws/latest/docs#default_tags-configuration-block) present, tags with matching keys will overwrite those defined at the provider-level. +* `vpc_config` - (Required) VPC configuration information. See below. + +### client_authentication Argument Reference + +* `sasl` - (Required) Details for client authentication using SASL. See below. + +### sasl Argument Reference + +* `iam` - (Required) Details for client authentication using IAM. See below. + +### iam Argument Reference + +* `enabled` - (Required) Whether SASL/IAM authentication is enabled or not. + +### vpc_config Argument Reference + +* `security_group_ids` - (Optional) Specifies up to five security groups that control inbound and outbound traffic for the serverless cluster. +* `subnet_ids` - (Required) A list of subnets in at least two different Availability Zones that host your client applications. + +## Attributes Reference + +In addition to all arguments above, the following attributes are exported: + +* `arn` - The ARN of the serverless cluster. +* `tags_all` - A map of tags assigned to the resource, including those inherited from the provider [`default_tags` configuration block](https://registry.terraform.io/providers/hashicorp/aws/latest/docs#default_tags-configuration-block). + +## Timeouts + +[Configuration options](https://www.terraform.io/docs/configuration/blocks/resources/syntax.html#operation-timeouts): + +* `create` - (Default `120m`) +* `delete` - (Default `120m`) + +## Import + +MSK serverless clusters can be imported using the cluster `arn`, e.g., + +``` +$ terraform import aws_msk_serverless_cluster.example arn:aws:kafka:us-west-2:123456789012:cluster/example/279c0212-d057-4dba-9aa9-1c4e5a25bfc7-3 +```