Skip to content

Commit

Permalink
feat: add support for topic description and ownership
Browse files Browse the repository at this point in the history
Add capability to set description and owner group per topic.
  • Loading branch information
roope-kar committed Sep 13, 2024
1 parent 1e1ae6a commit 62c6fef
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ nav_order: 1
- Use `ServiceGet` from the code-generated client
- Use the code-generated client to manage `aiven_service_integration` and `aiven_service_integration_endpoint`
- Use Go 1.23
- Add capability to set description and owner group per topic.

## [4.24.0] - 2024-08-21

Expand Down
2 changes: 2 additions & 0 deletions docs/data-sources/kafka_topic.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ data "aiven_kafka_topic" "example_topic" {

- `config` (List of Object) [Advanced parameters](https://aiven.io/docs/products/kafka/reference/advanced-params) to configure topics. (see [below for nested schema](#nestedatt--config))
- `id` (String) The ID of this resource.
- `owner_user_group_id` (String) The user group that is the owner of the topic
- `partitions` (Number) The number of partitions to create in the topic.
- `replication` (Number) The replication factor for the topic.
- `tag` (Set of Object) Tags for the topic. (see [below for nested schema](#nestedatt--tag))
- `termination_protection` (Boolean) Prevents topics from being deleted by Terraform. It's recommended for topics containing critical data. **Topics can still be deleted in the Aiven Console.**
- `topic_description` (String) The description of the topic

<a id="nestedatt--config"></a>
### Nested Schema for `config`
Expand Down
2 changes: 2 additions & 0 deletions docs/resources/kafka_topic.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ resource "aiven_kafka_topic" "example_topic" {
### Optional

- `config` (Block List, Max: 1) [Advanced parameters](https://aiven.io/docs/products/kafka/reference/advanced-params) to configure topics. (see [below for nested schema](#nestedblock--config))
- `owner_user_group_id` (String) The user group that is the owner of the topic
- `tag` (Block Set) Tags for the topic. (see [below for nested schema](#nestedblock--tag))
- `termination_protection` (Boolean) Prevents topics from being deleted by Terraform. It's recommended for topics containing critical data. **Topics can still be deleted in the Aiven Console.**
- `timeouts` (Block, Optional) (see [below for nested schema](#nestedblock--timeouts))
- `topic_description` (String) The description of the topic

### Read-Only

Expand Down
45 changes: 36 additions & 9 deletions internal/sdkprovider/service/kafkatopic/kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ var aivenKafkaTopicSchema = map[string]*schema.Schema{
Required: true,
Description: "The replication factor for the topic.",
},
"topic_description": {
Type: schema.TypeString,
Optional: true,
Description: "The description of the topic",
},
"owner_user_group_id": {
Type: schema.TypeString,
Optional: true,
Description: "The user group that is the owner of the topic",
},
"termination_protection": {
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -308,18 +318,22 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int
topicName := d.Get("topic_name").(string)
partitions := d.Get("partitions").(int)
replication := d.Get("replication").(int)
topicDescription := d.Get("topic_description").(string)
ownerUserGroupID := d.Get("owner_user_group_id").(string)

config, err := getKafkaTopicConfig(d)
if err != nil {
return diag.Errorf("config to json error: %s", err)
}

createRequest := aiven.CreateKafkaTopicRequest{
Partitions: &partitions,
Replication: &replication,
TopicName: topicName,
Config: config,
Tags: getTags(d),
Partitions: &partitions,
Replication: &replication,
TopicName: topicName,
Config: config,
Tags: getTags(d),
TopicDescription: &topicDescription,
OwnerUserGroupId: &ownerUserGroupID,
}

client := m.(*aiven.Client)
Expand Down Expand Up @@ -464,6 +478,14 @@ func resourceKafkaTopicRead(ctx context.Context, d *schema.ResourceData, m inter
return diag.Errorf("error setting Kafka Topic Tags for resource %s: %s", d.Id(), err)
}

if err := d.Set("topic_description", topic.TopicDescription); err != nil {
return diag.FromErr(err)
}

if err := d.Set("owner_user_group_id", topic.OwnerUserGroupId); err != nil {
return diag.FromErr(err)
}

return nil
}

Expand Down Expand Up @@ -499,17 +521,22 @@ func resourceKafkaTopicUpdate(ctx context.Context, d *schema.ResourceData, m int
return diag.Errorf("config to json error: %s", err)
}

topicDescription := d.Get("topic_description").(string)
ownerUserGroupID := d.Get("owner_user_group_id").(string)

client := m.(*aiven.Client)
err = kafkatopicrepository.New(client.KafkaTopics).Update(
ctx,
projectName,
serviceName,
topicName,
aiven.UpdateKafkaTopicRequest{
Partitions: &partitions,
Replication: schemautil.OptionalIntPointer(d, "replication"),
Config: config,
Tags: getTags(d),
Partitions: &partitions,
Replication: schemautil.OptionalIntPointer(d, "replication"),
Config: config,
Tags: getTags(d),
TopicDescription: &topicDescription,
OwnerUserGroupId: &ownerUserGroupID,
},
)
if err != nil {
Expand Down
128 changes: 106 additions & 22 deletions internal/sdkprovider/service/kafkatopic/kafka_topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,64 @@ func TestAccAivenKafkaTopic_custom_timeouts(t *testing.T) {
})
}

func TestAccAivenKafkaTopic_with_owner_and_description(t *testing.T) {
resourceName := "aiven_kafka_topic.foo"
rName := acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum)
resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acc.TestAccPreCheck(t) },
ProtoV6ProviderFactories: acc.TestProtoV6ProviderFactories,
CheckDestroy: testAccCheckAivenKafkaTopicResourceDestroy,
Steps: []resource.TestStep{
{
Config: testAccKafkaTopicWithOwnerAndDesctiptionResource(rName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(resourceName, "topic_description", fmt.Sprintf("test-acc-topic-desc-%s", rName)),
resource.TestCheckResourceAttrSet(resourceName, "owner_user_group_id"),
),
},
},
})
}

func testAccKafkaTopicWithOwnerAndDesctiptionResource(name string) string {
return fmt.Sprintf(`
data "aiven_organization" "foo" {
name = "%s"
}
resource "aiven_organization_user_group" "foo" {
name = "test-acc-u-grp-%s"
organization_id = data.aiven_organization.foo.id
description = "test"
}
data "aiven_project" "foo" {
project = "%s"
}
resource "aiven_kafka" "bar" {
project = data.aiven_project.foo.project
cloud_name = "google-europe-west1"
plan = "startup-2"
service_name = "test-acc-sr-%s"
maintenance_window_dow = "monday"
maintenance_window_time = "10:00:00"
}
resource "aiven_kafka_topic" "foo" {
project = data.aiven_project.foo.project
service_name = aiven_kafka.bar.service_name
topic_name = "test-acc-topic-%s"
topic_description = "test-acc-topic-desc-%s"
owner_user_group_id = aiven_organization_user_group.foo.group_id
partitions = 3
replication = 2
}
`, os.Getenv("AIVEN_ORGANIZATION_NAME"), name, os.Getenv("AIVEN_PROJECT_NAME"), name, name, name)
}

func testAccKafka451TopicResource(name string) string {
return testAccKafkaTopicResource(name) + `
resource "aiven_kafka_topic" "more" {
Expand All @@ -138,6 +196,16 @@ resource "aiven_kafka_topic" "more" {

func testAccKafkaTopicResource(name string) string {
return fmt.Sprintf(`
data "aiven_organization" "foo" {
name = "%s"
}
resource "aiven_organization_user_group" "foo" {
name = "test-acc-u-grp-%s"
organization_id = data.aiven_organization.foo.id
description = "test"
}
data "aiven_project" "foo" {
project = "%s"
}
Expand Down Expand Up @@ -173,7 +241,7 @@ data "aiven_kafka_topic" "topic" {
topic_name = aiven_kafka_topic.foo.topic_name
depends_on = [aiven_kafka_topic.foo]
}`, os.Getenv("AIVEN_PROJECT_NAME"), name, name)
}`, name, os.Getenv("AIVEN_PROJECT_NAME"), name, name, name)
}

func testAccKafkaTopicCustomTimeoutsResource(name string) string {
Expand Down Expand Up @@ -294,35 +362,51 @@ func testAccCheckAivenKafkaTopicResourceDestroy(s *terraform.State) error {

ctx := context.Background()

// loop through the resources in state, verifying each kafka topic is destroyed
// loop through the resources in state, verifying each created resource is destroyed
for _, rs := range s.RootModule().Resources {
if rs.Type != "aiven_kafka_topic" {
continue
}
if rs.Type == "aiven_kafka_topic" {
project, serviceName, topicName, err := schemautil.SplitResourceID3(rs.Primary.ID)
if err != nil {
return err
}

project, serviceName, topicName, err := schemautil.SplitResourceID3(rs.Primary.ID)
if err != nil {
return err
}
_, err = c.Services.Get(ctx, project, serviceName)
if err != nil {
if aiven.IsNotFound(err) {
return nil
}
return err
}

_, err = c.Services.Get(ctx, project, serviceName)
if err != nil {
if aiven.IsNotFound(err) {
return nil
t, err := c.KafkaTopics.Get(ctx, project, serviceName, topicName)
if err != nil {
if aiven.IsNotFound(err) {
return nil
}
return err
}
return err
}

t, err := c.KafkaTopics.Get(ctx, project, serviceName, topicName)
if err != nil {
if aiven.IsNotFound(err) {
return nil
if t != nil {
return fmt.Errorf("kafka topic (%s) still exists, id %s", topicName, rs.Primary.ID)
}
return err
}
if rs.Type == "aiven_organization_user_group" {
orgID, userGroupID, err := schemautil.SplitResourceID2(rs.Primary.ID)
if err != nil {
return err
}

if t != nil {
return fmt.Errorf("kafka topic (%s) still exists, id %s", topicName, rs.Primary.ID)
r, err := c.OrganizationUserGroups.Get(ctx, orgID, userGroupID)
if err != nil {
if aiven.IsNotFound(err) {
return nil
}
return err
}

if r != nil {
return fmt.Errorf("organization user group (%s) still exists", rs.Primary.ID)
}
}
}

Expand Down

0 comments on commit 62c6fef

Please sign in to comment.