Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New resource: Add blob reference input as a new resource for stream analytics input resources (rebased) #5129

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions azurerm/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ func Provider() terraform.ResourceProvider {
"azurerm_stream_analytics_output_eventhub": resourceArmStreamAnalyticsOutputEventHub(),
"azurerm_stream_analytics_output_servicebus_queue": resourceArmStreamAnalyticsOutputServiceBusQueue(),
"azurerm_stream_analytics_output_servicebus_topic": resourceArmStreamAnalyticsOutputServiceBusTopic(),
"azurerm_stream_analytics_reference_input_blob": resourceArmStreamAnalyticsReferenceInputBlob(),
"azurerm_stream_analytics_stream_input_blob": resourceArmStreamAnalyticsStreamInputBlob(),
"azurerm_stream_analytics_stream_input_eventhub": resourceArmStreamAnalyticsStreamInputEventHub(),
"azurerm_stream_analytics_stream_input_iothub": resourceArmStreamAnalyticsStreamInputIoTHub(),
Expand Down
304 changes: 304 additions & 0 deletions azurerm/resource_arm_stream_analytics_reference_input_blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
package azurerm

import (
"fmt"
"log"
"time"

"github.com/hashicorp/go-azure-helpers/response"

"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/azure"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/validate"

"github.com/Azure/azure-sdk-for-go/services/streamanalytics/mgmt/2016-03-01/streamanalytics"

"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/tf"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/internal/features"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/internal/timeouts"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/utils"
)

func resourceArmStreamAnalyticsReferenceInputBlob() *schema.Resource {
return &schema.Resource{
Create: resourceArmStreamAnalyticsReferenceInputBlobCreate,
Read: resourceArmStreamAnalyticsReferenceInputBlobRead,
Update: resourceArmStreamAnalyticsReferenceInputBlobUpdate,
Delete: resourceArmStreamAnalyticsReferenceInputBlobDelete,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},

Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(30 * time.Minute),
Read: schema.DefaultTimeout(5 * time.Minute),
Update: schema.DefaultTimeout(30 * time.Minute),
Delete: schema.DefaultTimeout(30 * time.Minute),
},

Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},

"stream_analytics_job_name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validate.NoEmptyStrings,
},

"resource_group_name": azure.SchemaResourceGroupName(),

"date_format": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validate.NoEmptyStrings,
},

"path_pattern": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validate.NoEmptyStrings,
},

"storage_account_key": {
Type: schema.TypeString,
Required: true,
Sensitive: true,
ValidateFunc: validate.NoEmptyStrings,
},

"storage_account_name": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validate.NoEmptyStrings,
},

"storage_container_name": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validate.NoEmptyStrings,
},

"time_format": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validate.NoEmptyStrings,
},

"serialization": azure.SchemaStreamAnalyticsStreamInputSerialization(),
},
}
}

func resourceArmStreamAnalyticsReferenceInputBlobCreate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*ArmClient).StreamAnalytics.InputsClient
ctx, cancel := timeouts.ForCreate(meta.(*ArmClient).StopContext, d)
defer cancel()

log.Printf("[INFO] preparing arguments for Azure Stream Analytics Reference Input Blob creation.")
name := d.Get("name").(string)
jobName := d.Get("stream_analytics_job_name").(string)
resourceGroup := d.Get("resource_group_name").(string)

if features.ShouldResourcesBeImported() && d.IsNewResource() {
existing, err := client.Get(ctx, resourceGroup, jobName, name)
if err != nil {
if !utils.ResponseWasNotFound(existing.Response) {
return fmt.Errorf("Error checking for presence of existing Stream Analytics Reference Input %q (Job %q / Resource Group %q): %s", name, jobName, resourceGroup, err)
}
}

if existing.ID != nil && *existing.ID != "" {
return tf.ImportAsExistsError("azurerm_stream_analytics_reference_input_blob", *existing.ID)
}
}

containerName := d.Get("storage_container_name").(string)
dateFormat := d.Get("date_format").(string)
pathPattern := d.Get("path_pattern").(string)
storageAccountKey := d.Get("storage_account_key").(string)
storageAccountName := d.Get("storage_account_name").(string)
timeFormat := d.Get("time_format").(string)

serializationRaw := d.Get("serialization").([]interface{})
serialization, err := azure.ExpandStreamAnalyticsStreamInputSerialization(serializationRaw)
if err != nil {
return fmt.Errorf("Error expanding `serialization`: %+v", err)
}

props := streamanalytics.Input{
Name: utils.String(d.Get("name").(string)),
Properties: &streamanalytics.ReferenceInputProperties{
Type: streamanalytics.TypeReference,
Datasource: &streamanalytics.BlobReferenceInputDataSource{
Type: streamanalytics.TypeBasicReferenceInputDataSourceTypeMicrosoftStorageBlob,
BlobReferenceInputDataSourceProperties: &streamanalytics.BlobReferenceInputDataSourceProperties{
Container: utils.String(containerName),
DateFormat: utils.String(dateFormat),
PathPattern: utils.String(pathPattern),
TimeFormat: utils.String(timeFormat),
StorageAccounts: &[]streamanalytics.StorageAccount{
{
AccountName: utils.String(storageAccountName),
AccountKey: utils.String(storageAccountKey),
},
},
},
},
Serialization: serialization,
},
}

if _, err := client.CreateOrReplace(ctx, props, resourceGroup, jobName, name, "", ""); err != nil {
return fmt.Errorf("Error Creating Stream Analytics Reference Input Blob %q (Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}

read, err := client.Get(ctx, resourceGroup, jobName, name)
if err != nil {
return fmt.Errorf("Error retrieving Stream Analytics Reference Input Blob %q (Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}
if read.ID == nil {
return fmt.Errorf("Cannot read ID of Stream Analytics Reference Input Blob %q (Job %q / Resource Group %q)", name, jobName, resourceGroup)
}

d.SetId(*read.ID)

return resourceArmStreamAnalyticsReferenceInputBlobRead(d, meta)
}

func resourceArmStreamAnalyticsReferenceInputBlobUpdate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*ArmClient).StreamAnalytics.InputsClient
ctx, cancel := timeouts.ForCreate(meta.(*ArmClient).StopContext, d)
defer cancel()

log.Printf("[INFO] preparing arguments for Azure Stream Analytics Reference Input Blob creation.")
name := d.Get("name").(string)
jobName := d.Get("stream_analytics_job_name").(string)
resourceGroup := d.Get("resource_group_name").(string)

containerName := d.Get("storage_container_name").(string)
dateFormat := d.Get("date_format").(string)
pathPattern := d.Get("path_pattern").(string)
storageAccountKey := d.Get("storage_account_key").(string)
storageAccountName := d.Get("storage_account_name").(string)
timeFormat := d.Get("time_format").(string)

serializationRaw := d.Get("serialization").([]interface{})
serialization, err := azure.ExpandStreamAnalyticsStreamInputSerialization(serializationRaw)
if err != nil {
return fmt.Errorf("Error expanding `serialization`: %+v", err)
}

props := streamanalytics.Input{
Name: utils.String(d.Get("name").(string)),
Properties: &streamanalytics.ReferenceInputProperties{
Type: streamanalytics.TypeReference,
Datasource: &streamanalytics.BlobReferenceInputDataSource{
Type: streamanalytics.TypeBasicReferenceInputDataSourceTypeMicrosoftStorageBlob,
BlobReferenceInputDataSourceProperties: &streamanalytics.BlobReferenceInputDataSourceProperties{
Container: utils.String(containerName),
DateFormat: utils.String(dateFormat),
PathPattern: utils.String(pathPattern),
TimeFormat: utils.String(timeFormat),
StorageAccounts: &[]streamanalytics.StorageAccount{
{
AccountName: utils.String(storageAccountName),
AccountKey: utils.String(storageAccountKey),
},
},
},
},
Serialization: serialization,
},
}

if _, err := client.Update(ctx, props, resourceGroup, jobName, name, ""); err != nil {
return fmt.Errorf("Error Updating Stream Analytics Reference Input Blob %q (Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}

return resourceArmStreamAnalyticsReferenceInputBlobRead(d, meta)
}

func resourceArmStreamAnalyticsReferenceInputBlobRead(d *schema.ResourceData, meta interface{}) error {
client := meta.(*ArmClient).StreamAnalytics.InputsClient
ctx, cancel := timeouts.ForCreate(meta.(*ArmClient).StopContext, d)
defer cancel()

id, err := azure.ParseAzureResourceID(d.Id())
if err != nil {
return err
}
resourceGroup := id.ResourceGroup
jobName := id.Path["streamingjobs"]
name := id.Path["inputs"]

resp, err := client.Get(ctx, resourceGroup, jobName, name)
if err != nil {
if utils.ResponseWasNotFound(resp.Response) {
log.Printf("[DEBUG] Reference Input Blob %q was not found in Stream Analytics Job %q / Resource Group %q - removing from state!", name, jobName, resourceGroup)
d.SetId("")
return nil
}

return fmt.Errorf("Error retrieving Reference Input Blob %q (Stream Analytics Job %q / Resource Group %q): %+v", name, jobName, resourceGroup, err)
}

d.Set("name", name)
d.Set("resource_group_name", resourceGroup)
d.Set("stream_analytics_job_name", jobName)

if props := resp.Properties; props != nil {
v, ok := props.AsReferenceInputProperties()
if !ok {
return fmt.Errorf("Error converting Reference Input Blob to a Reference Input: %+v", err)
}

blobInputDataSource, ok := v.Datasource.AsBlobReferenceInputDataSource()
if !ok {
return fmt.Errorf("Error converting Reference Input Blob to an Blob Stream Input: %+v", err)
}

d.Set("date_format", blobInputDataSource.DateFormat)
d.Set("path_pattern", blobInputDataSource.PathPattern)
d.Set("storage_container_name", blobInputDataSource.Container)
d.Set("time_format", blobInputDataSource.TimeFormat)

if accounts := blobInputDataSource.StorageAccounts; accounts != nil && len(*accounts) > 0 {
account := (*accounts)[0]
d.Set("storage_account_name", account.AccountName)
}

if err := d.Set("serialization", azure.FlattenStreamAnalyticsStreamInputSerialization(v.Serialization)); err != nil {
return fmt.Errorf("Error setting `serialization`: %+v", err)
}
}

return nil
}

func resourceArmStreamAnalyticsReferenceInputBlobDelete(d *schema.ResourceData, meta interface{}) error {
client := meta.(*ArmClient).StreamAnalytics.InputsClient
ctx, cancel := timeouts.ForCreate(meta.(*ArmClient).StopContext, d)
defer cancel()

id, err := azure.ParseAzureResourceID(d.Id())
if err != nil {
return err
}
resourceGroup := id.ResourceGroup
jobName := id.Path["streamingjobs"]
name := id.Path["inputs"]

if resp, err := client.Delete(ctx, resourceGroup, jobName, name); err != nil {
if !response.WasNotFound(resp.Response) {
return fmt.Errorf("Error deleting Reference Input Blob %q (Stream Analytics Job %q / Resource Group %q) %+v", name, jobName, resourceGroup, err)
}
}

return nil
}
Loading