Skip to content

Commit

Permalink
feat: use codegen client with service integrations
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Sep 6, 2024
1 parent ab86c0e commit 82d0f67
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 143 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ nav_order: 1
- Add `backup_hour` and `backup_minute` to `redis_user_config` and `valkey_user_config`
- Add user configuration options for `external_s3`, `external_clickhouse` and `external_mysql` integration endpoint types
- Use `ServiceGet` from the code-generated client
- Use the code-generated client to manage `aiven_service_integration` and `aiven_service_integration_endpoint`
- Use Go 1.23

## [4.24.0] - 2024-08-21
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/hashicorp/terraform-plugin-mux v0.16.0
github.com/hashicorp/terraform-plugin-sdk/v2 v2.34.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/rs/zerolog v1.33.0
github.com/samber/lo v1.47.0
github.com/stoewer/go-strcase v1.3.0
github.com/stretchr/testify v1.9.0
Expand All @@ -38,7 +39,6 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.7 // indirect
github.com/rs/zerolog v1.33.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
Expand Down
138 changes: 64 additions & 74 deletions internal/sdkprovider/service/serviceintegration/service_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"time"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
codegenintegrations "github.com/aiven/go-client-codegen/handler/serviceintegration"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
Expand Down Expand Up @@ -83,10 +83,10 @@ func aivenServiceIntegrationSchema() map[string]*schema.Schema {
func ResourceServiceIntegration() *schema.Resource {
return &schema.Resource{
Description: "Creates and manages an Aiven [service integration](https://aiven.io/docs/platform/concepts/service-integration).",
CreateContext: resourceServiceIntegrationCreate,
ReadContext: resourceServiceIntegrationRead,
UpdateContext: resourceServiceIntegrationUpdate,
DeleteContext: resourceServiceIntegrationDelete,
CreateContext: common.WithGenClient(resourceServiceIntegrationCreate),
ReadContext: common.WithGenClient(resourceServiceIntegrationRead),
UpdateContext: common.WithGenClient(resourceServiceIntegrationUpdate),
DeleteContext: common.WithGenClient(resourceServiceIntegrationDelete),
Importer: &schema.ResourceImporter{
StateContext: schema.ImportStatePassthroughContext,
},
Expand All @@ -109,9 +109,7 @@ func plainEndpointID(fullEndpointID *string) *string {
return &endpointID
}

func resourceServiceIntegrationCreate(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
client := m.(*aiven.Client)

func resourceServiceIntegrationCreate(ctx context.Context, d *schema.ResourceData, client avngen.Client) error {
projectName := d.Get("project").(string)
integrationType := d.Get("integration_type").(string)

Expand All @@ -120,153 +118,145 @@ func resourceServiceIntegrationCreate(ctx context.Context, d *schema.ResourceDat
// for now we support to seemlessly import preexisting 'read_replica' service integrations in the resource create
// all other integrations should be imported using `terraform import`
if integrationType == "read_replica" {
if preexisting, err := resourceServiceIntegrationCheckForPreexistingResource(ctx, d, m); err != nil {
return diag.Errorf("unable to search for possible preexisting 'read_replica' service integration: %s", err)
if preexisting, err := resourceServiceIntegrationCheckForPreexistingResource(ctx, d, client); err != nil {
return fmt.Errorf("unable to search for possible preexisting 'read_replica' service integration: %w", err)
} else if preexisting != nil {
d.SetId(schemautil.BuildResourceID(projectName, preexisting.ServiceIntegrationID))
return resourceServiceIntegrationRead(ctx, d, m)
d.SetId(schemautil.BuildResourceID(projectName, preexisting.ServiceIntegrationId))
return resourceServiceIntegrationRead(ctx, d, client)
}
}

req := aiven.CreateServiceIntegrationRequest{
DestinationEndpointID: plainEndpointID(schemautil.OptionalStringPointer(d, "destination_endpoint_id")),
DestinationService: schemautil.OptionalStringPointer(d, "destination_service_name"),
IntegrationType: integrationType,
SourceEndpointID: plainEndpointID(schemautil.OptionalStringPointer(d, "source_endpoint_id")),
SourceService: schemautil.OptionalStringPointer(d, "source_service_name"),
req := &codegenintegrations.ServiceIntegrationCreateIn{
DestEndpointId: plainEndpointID(schemautil.OptionalStringPointer(d, "destination_endpoint_id")),
DestService: schemautil.OptionalStringPointer(d, "destination_service_name"),
IntegrationType: codegenintegrations.IntegrationType(integrationType),
SourceEndpointId: plainEndpointID(schemautil.OptionalStringPointer(d, "source_endpoint_id")),
SourceService: schemautil.OptionalStringPointer(d, "source_service_name"),
}

uc, err := converters.Expand(converters.ServiceIntegrationUserConfig, integrationType, d)
if err != nil {
return diag.FromErr(err)
return err
}

req.UserConfig = uc
if uc != nil {
req.UserConfig = &uc
}

res, err := client.ServiceIntegrations.Create(ctx, projectName, req)
res, err := client.ServiceIntegrationCreate(ctx, projectName, req)
if err != nil {
return diag.Errorf("error creating service integration: %s", err)
return fmt.Errorf("error creating service integration: %w", err)
}
d.SetId(schemautil.BuildResourceID(projectName, res.ServiceIntegrationID))
d.SetId(schemautil.BuildResourceID(projectName, res.ServiceIntegrationId))

if err = resourceServiceIntegrationWaitUntilActive(ctx, d, m, d.Timeout(schema.TimeoutCreate)); err != nil {
return diag.Errorf("unable to wait for service integration to become active: %s", err)
if err = resourceServiceIntegrationWaitUntilActive(ctx, d, client, d.Timeout(schema.TimeoutCreate)); err != nil {
return fmt.Errorf("unable to wait for service integration to become active: %w", err)
}
return resourceServiceIntegrationRead(ctx, d, m)
return resourceServiceIntegrationRead(ctx, d, client)
}

func resourceServiceIntegrationRead(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
client := m.(*aiven.Client)

func resourceServiceIntegrationRead(ctx context.Context, d *schema.ResourceData, client avngen.Client) error {
projectName, integrationID, err := schemautil.SplitResourceID2(d.Id())
if err != nil {
return diag.FromErr(err)
return err
}

res, err := client.ServiceIntegrations.Get(ctx, projectName, integrationID)
res, err := client.ServiceIntegrationGet(ctx, projectName, integrationID)
if err != nil {
err = schemautil.ResourceReadHandleNotFound(err, d)
if err != nil {
return diag.Errorf("cannot get service integration: %s; id: %s", err, integrationID)
return fmt.Errorf("cannot get service integration: %w; id: %s", err, integrationID)
}
return nil
}

if err = resourceServiceIntegrationCopyAPIResponseToTerraform(d, res, projectName); err != nil {
return diag.Errorf("cannot copy api response into terraform schema: %s", err)
return fmt.Errorf("cannot copy api response into terraform schema: %w", err)
}

return nil
}

func resourceServiceIntegrationUpdate(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
client := m.(*aiven.Client)

func resourceServiceIntegrationUpdate(ctx context.Context, d *schema.ResourceData, client avngen.Client) error {
projectName, integrationID, err := schemautil.SplitResourceID2(d.Id())
if err != nil {
return diag.FromErr(err)
return err
}

integrationType := d.Get("integration_type").(string)
userConfig, err := converters.Expand(converters.ServiceIntegrationUserConfig, integrationType, d)
if err != nil {
return diag.FromErr(err)
return err
}

if userConfig == nil {
// Required by API
userConfig = make(map[string]interface{})
}

_, err = client.ServiceIntegrations.Update(
_, err = client.ServiceIntegrationUpdate(
ctx,
projectName,
integrationID,
aiven.UpdateServiceIntegrationRequest{
&codegenintegrations.ServiceIntegrationUpdateIn{
UserConfig: userConfig,
},
)
if err != nil {
return diag.Errorf("unable to update service integration: %s", err)
return fmt.Errorf("unable to update service integration: %w", err)
}
if err = resourceServiceIntegrationWaitUntilActive(ctx, d, m, d.Timeout(schema.TimeoutUpdate)); err != nil {
return diag.Errorf("unable to wait for service integration to become active: %s", err)
if err = resourceServiceIntegrationWaitUntilActive(ctx, d, client, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("unable to wait for service integration to become active: %w", err)
}

return resourceServiceIntegrationRead(ctx, d, m)
return resourceServiceIntegrationRead(ctx, d, client)
}

func resourceServiceIntegrationDelete(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
client := m.(*aiven.Client)

func resourceServiceIntegrationDelete(ctx context.Context, d *schema.ResourceData, client avngen.Client) error {
projectName, integrationID, err := schemautil.SplitResourceID2(d.Id())
if err != nil {
return diag.FromErr(err)
return err
}

err = client.ServiceIntegrations.Delete(ctx, projectName, integrationID)
err = client.ServiceIntegrationDelete(ctx, projectName, integrationID)
if common.IsCritical(err) {
return diag.Errorf("cannot delete service integration: %s", err)
return fmt.Errorf("cannot delete service integration: %w", err)
}

return nil
}

func resourceServiceIntegrationCheckForPreexistingResource(ctx context.Context, d *schema.ResourceData, m interface{}) (*aiven.ServiceIntegration, error) {
client := m.(*aiven.Client)

func resourceServiceIntegrationCheckForPreexistingResource(ctx context.Context, d *schema.ResourceData, client avngen.Client) (*codegenintegrations.ServiceIntegrationOut, error) {
projectName := d.Get("project").(string)
integrationType := d.Get("integration_type").(string)
sourceServiceName := d.Get("source_service_name").(string)
destinationServiceName := d.Get("destination_service_name").(string)

integrations, err := client.ServiceIntegrations.List(ctx, projectName, sourceServiceName)
integrations, err := client.ServiceIntegrationList(ctx, projectName, sourceServiceName)
if common.IsCritical(err) {
return nil, fmt.Errorf("unable to get list of service integrations: %w", err)
}

for i := range integrations {
integration := integrations[i]
if integration.SourceService == nil || integration.DestinationService == nil || integration.ServiceIntegrationID == "" {
if integration.SourceService == "" || integration.DestService == nil || integration.ServiceIntegrationId == "" {
continue
}

if integration.IntegrationType == integrationType &&
*integration.SourceService == sourceServiceName &&
*integration.DestinationService == destinationServiceName {
return integration, nil
integration.SourceService == sourceServiceName &&
*integration.DestService == destinationServiceName {
return &integration, nil
}
}
return nil, nil
}

func resourceServiceIntegrationWaitUntilActive(ctx context.Context, d *schema.ResourceData, m interface{}, timeout time.Duration) error {
func resourceServiceIntegrationWaitUntilActive(ctx context.Context, d *schema.ResourceData, client avngen.Client, timeout time.Duration) error {
const (
active = "ACTIVE"
notActive = "NOTACTIVE"
)
client := m.(*aiven.Client)

projectName, integrationID, err := schemautil.SplitResourceID2(d.Id())
if err != nil {
return err
Expand All @@ -278,7 +268,7 @@ func resourceServiceIntegrationWaitUntilActive(ctx context.Context, d *schema.Re
Refresh: func() (interface{}, string, error) {
log.Println("[DEBUG] Service Integration: waiting until active")

ii, err := client.ServiceIntegrations.Get(ctx, projectName, integrationID)
ii, err := client.ServiceIntegrationGet(ctx, projectName, integrationID)
if err != nil {
// Sometimes Aiven API retrieves 404 error even when a successful service integration is created
if aiven.IsNotFound(err) {
Expand All @@ -292,8 +282,8 @@ func resourceServiceIntegrationWaitUntilActive(ctx context.Context, d *schema.Re
return nil, notActive, nil
}

if ii.IntegrationType == "kafka_connect" && ii.DestinationService != nil {
if _, err := client.KafkaConnectors.List(ctx, projectName, *ii.DestinationService); err != nil {
if ii.IntegrationType == "kafka_connect" && ii.DestService != nil {
if _, err := client.ServiceKafkaConnectList(ctx, projectName, *ii.DestService); err != nil {
log.Println("[DEBUG] Service Integration: error listing kafka connectors: ", err)
return nil, notActive, nil
}
Expand All @@ -313,32 +303,32 @@ func resourceServiceIntegrationWaitUntilActive(ctx context.Context, d *schema.Re

func resourceServiceIntegrationCopyAPIResponseToTerraform(
d *schema.ResourceData,
res *aiven.ServiceIntegration,
res *codegenintegrations.ServiceIntegrationGetOut,
project string,
) error {
if err := d.Set("project", project); err != nil {
return err
}

if res.DestinationEndpointID != nil {
if err := d.Set("destination_endpoint_id", schemautil.BuildResourceID(project, *res.DestinationEndpointID)); err != nil {
if res.DestEndpointId != nil {
if err := d.Set("destination_endpoint_id", schemautil.BuildResourceID(project, *res.DestEndpointId)); err != nil {
return err
}
} else if res.DestinationService != nil {
if err := d.Set("destination_service_name", *res.DestinationService); err != nil {
} else if res.DestService != nil {
if err := d.Set("destination_service_name", *res.DestService); err != nil {
return err
}
}
if res.SourceEndpointID != nil {
if err := d.Set("source_endpoint_id", schemautil.BuildResourceID(project, *res.SourceEndpointID)); err != nil {
if res.SourceEndpointId != nil {
if err := d.Set("source_endpoint_id", schemautil.BuildResourceID(project, *res.SourceEndpointId)); err != nil {
return err
}
} else if res.SourceService != nil {
if err := d.Set("source_service_name", *res.SourceService); err != nil {
} else if res.SourceService != "" {
if err := d.Set("source_service_name", res.SourceService); err != nil {
return err
}
}
if err := d.Set("integration_id", res.ServiceIntegrationID); err != nil {
if err := d.Set("integration_id", res.ServiceIntegrationId); err != nil {
return err
}
integrationType := res.IntegrationType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,49 @@ package serviceintegration

import (
"context"
"fmt"

"github.com/aiven/aiven-go-client/v2"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
avngen "github.com/aiven/go-client-codegen"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"

"github.com/aiven/terraform-provider-aiven/internal/common"
"github.com/aiven/terraform-provider-aiven/internal/schemautil"
)

func DatasourceServiceIntegration() *schema.Resource {
return &schema.Resource{
ReadContext: datasourceServiceIntegrationRead,
ReadContext: common.WithGenClient(datasourceServiceIntegrationRead),
Description: "Gets information about an Aiven service integration.",
Schema: schemautil.ResourceSchemaAsDatasourceSchema(aivenServiceIntegrationSchema(),
"project", "integration_type", "source_service_name", "destination_service_name"),
}
}

func datasourceServiceIntegrationRead(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
client := m.(*aiven.Client)

func datasourceServiceIntegrationRead(ctx context.Context, d *schema.ResourceData, client avngen.Client) error {
projectName := d.Get("project").(string)
integrationType := d.Get("integration_type").(string)
sourceServiceName := d.Get("source_service_name").(string)
destinationServiceName := d.Get("destination_service_name").(string)

integrations, err := client.ServiceIntegrations.List(ctx, projectName, sourceServiceName)
integrations, err := client.ServiceIntegrationList(ctx, projectName, sourceServiceName)
if err != nil {
return diag.Errorf("unable to list integrations for %s/%s: %s", projectName, sourceServiceName, err)
return fmt.Errorf("unable to list integrations for %s/%s: %w", projectName, sourceServiceName, err)
}

for _, i := range integrations {
if i.SourceService == nil || i.DestinationService == nil {
if i.SourceService == "" || i.DestService == nil {
continue
}

if i.IntegrationType == integrationType &&
*i.SourceService == sourceServiceName &&
*i.DestinationService == destinationServiceName {
i.SourceService == sourceServiceName &&
*i.DestService == destinationServiceName {

d.SetId(schemautil.BuildResourceID(projectName, i.ServiceIntegrationID))
return resourceServiceIntegrationRead(ctx, d, m)
d.SetId(schemautil.BuildResourceID(projectName, i.ServiceIntegrationId))
return resourceServiceIntegrationRead(ctx, d, client)
}
}

return diag.Errorf("common integration %s/%s/%s/%s not found",
return fmt.Errorf("common integration %s/%s/%s/%s not found",
projectName, integrationType, sourceServiceName, destinationServiceName)
}
Loading

0 comments on commit 82d0f67

Please sign in to comment.