diff --git a/azurerm/internal/services/datafactory/data_factory.go b/azurerm/internal/services/datafactory/data_factory.go index 7b440aa37838..98f7f2182468 100644 --- a/azurerm/internal/services/datafactory/data_factory.go +++ b/azurerm/internal/services/datafactory/data_factory.go @@ -1,6 +1,7 @@ package datafactory import ( + "encoding/json" "fmt" "log" "regexp" @@ -9,6 +10,7 @@ import ( "github.com/Azure/azure-sdk-for-go/services/datafactory/mgmt/2018-06-01/datafactory" "github.com/hashicorp/terraform-plugin-sdk/helper/schema" + "github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/azure" ) func validateAzureRMDataFactoryLinkedServiceDatasetName(v interface{}, k string) (warnings []string, errors []error) { @@ -190,3 +192,38 @@ func flattenDataFactoryStructureColumns(input interface{}) []interface{} { } return output } + +func deserializeDataFactoryPipelineActivities(jsonData string) (*[]datafactory.BasicActivity, error) { + jsonData = fmt.Sprintf(`{ "activities": %s }`, jsonData) + pipeline := &datafactory.Pipeline{} + err := pipeline.UnmarshalJSON([]byte(jsonData)) + if err != nil { + return nil, err + } + return pipeline.Activities, nil +} + +func serializeDataFactoryPipelineActivities(activities *[]datafactory.BasicActivity) (string, error) { + pipeline := &datafactory.Pipeline{Activities: activities} + result, err := pipeline.MarshalJSON() + if err != nil { + return "nil", err + } + + var m map[string]*json.RawMessage + err = json.Unmarshal(result, &m) + if err != nil { + return "", err + } + + activitiesJson, err := json.Marshal(m["activities"]) + if err != nil { + return "", err + } + + return string(activitiesJson), nil +} + +func suppressJsonOrderingDifference(_, old, new string, _ *schema.ResourceData) bool { + return azure.NormalizeJson(old) == azure.NormalizeJson(new) +} diff --git a/azurerm/internal/services/datafactory/data_factory_pipeline_resource.go b/azurerm/internal/services/datafactory/data_factory_pipeline_resource.go index 36ceb065484f..64d1bb8c0055 100644 --- a/azurerm/internal/services/datafactory/data_factory_pipeline_resource.go +++ b/azurerm/internal/services/datafactory/data_factory_pipeline_resource.go @@ -72,6 +72,13 @@ func resourceArmDataFactoryPipeline() *schema.Resource { Optional: true, }, + "activities_json": { + Type: schema.TypeString, + Optional: true, + StateFunc: azure.NormalizeJson, + DiffSuppressFunc: suppressJsonOrderingDifference, + }, + "annotations": { Type: schema.TypeList, Optional: true, @@ -98,7 +105,7 @@ func resourceArmDataFactoryPipelineCreateUpdate(d *schema.ResourceData, meta int existing, err := client.Get(ctx, resourceGroupName, dataFactoryName, name, "") if err != nil { if !utils.ResponseWasNotFound(existing.Response) { - return fmt.Errorf("Error checking for presence of existing Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %s", name, resourceGroupName, dataFactoryName, err) + return fmt.Errorf("checking for presence of existing Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %s", name, resourceGroupName, dataFactoryName, err) } } @@ -114,6 +121,14 @@ func resourceArmDataFactoryPipelineCreateUpdate(d *schema.ResourceData, meta int Description: &description, } + if v, ok := d.GetOk("activities_json"); ok { + activities, err := deserializeDataFactoryPipelineActivities(v.(string)) + if err != nil { + return fmt.Errorf("parsing 'activities_json' for Data Factory Pipeline %q (Resource Group %q / Data Factory %q) ID: %+v", name, resourceGroupName, dataFactoryName, err) + } + pipeline.Activities = activities + } + if v, ok := d.GetOk("annotations"); ok { annotations := v.([]interface{}) pipeline.Annotations = &annotations @@ -127,16 +142,16 @@ func resourceArmDataFactoryPipelineCreateUpdate(d *schema.ResourceData, meta int } if _, err := client.CreateOrUpdate(ctx, resourceGroupName, dataFactoryName, name, config, ""); err != nil { - return fmt.Errorf("Error creating Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err) + return fmt.Errorf("creating Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err) } read, err := client.Get(ctx, resourceGroupName, dataFactoryName, name, "") if err != nil { - return fmt.Errorf("Error retrieving Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err) + return fmt.Errorf("retrieving Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err) } if read.ID == nil { - return fmt.Errorf("Cannot read Data Factory Pipeline %q (Resource Group %q / Data Factory %q) ID", name, resourceGroupName, dataFactoryName) + return fmt.Errorf("cannot read Data Factory Pipeline %q (Resource Group %q / Data Factory %q) ID", name, resourceGroupName, dataFactoryName) } d.SetId(*read.ID) @@ -163,7 +178,7 @@ func resourceArmDataFactoryPipelineRead(d *schema.ResourceData, meta interface{} log.Printf("[DEBUG] Data Factory Pipeline %q was not found in Resource Group %q - removing from state!", name, id.ResourceGroup) return nil } - return fmt.Errorf("Error reading the state of Data Factory Pipeline %q: %+v", name, err) + return fmt.Errorf("reading the state of Data Factory Pipeline %q: %+v", name, err) } d.Set("name", resp.Name) @@ -175,17 +190,27 @@ func resourceArmDataFactoryPipelineRead(d *schema.ResourceData, meta interface{} parameters := flattenDataFactoryParameters(props.Parameters) if err := d.Set("parameters", parameters); err != nil { - return fmt.Errorf("Error setting `parameters`: %+v", err) + return fmt.Errorf("setting `parameters`: %+v", err) } annotations := flattenDataFactoryAnnotations(props.Annotations) if err := d.Set("annotations", annotations); err != nil { - return fmt.Errorf("Error setting `annotations`: %+v", err) + return fmt.Errorf("setting `annotations`: %+v", err) } variables := flattenDataFactoryVariables(props.Variables) if err := d.Set("variables", variables); err != nil { - return fmt.Errorf("Error setting `variables`: %+v", err) + return fmt.Errorf("setting `variables`: %+v", err) + } + + if activities := props.Activities; activities != nil { + activitiesJson, err := serializeDataFactoryPipelineActivities(activities) + if err != nil { + return fmt.Errorf("serializing `activities_json`: %+v", err) + } + if err := d.Set("activities_json", activitiesJson); err != nil { + return fmt.Errorf("setting `activities_json`: %+v", err) + } } } @@ -206,7 +231,7 @@ func resourceArmDataFactoryPipelineDelete(d *schema.ResourceData, meta interface resourceGroupName := id.ResourceGroup if _, err = client.Delete(ctx, resourceGroupName, dataFactoryName, name); err != nil { - return fmt.Errorf("Error deleting Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err) + return fmt.Errorf("deleting Data Factory Pipeline %q (Resource Group %q / Data Factory %q): %+v", name, resourceGroupName, dataFactoryName, err) } return nil diff --git a/azurerm/internal/services/datafactory/data_factory_test.go b/azurerm/internal/services/datafactory/data_factory_test.go index 74966406cdf8..ac306ea6a31f 100644 --- a/azurerm/internal/services/datafactory/data_factory_test.go +++ b/azurerm/internal/services/datafactory/data_factory_test.go @@ -38,3 +38,168 @@ func TestAzureRmDataFactoryLinkedServiceConnectionStringDiff(t *testing.T) { } } } + +func TestAzureRmDataFactoryDeserializePipelineActivities(t *testing.T) { + cases := []struct { + Json string + ExpectActivityCount int + ExpectErr bool + }{ + { + Json: "{}", + ExpectActivityCount: 0, + ExpectErr: true, + }, + { + Json: `[ + { + "type": "ForEach", + "typeProperties": { + "isSequential": true, + "items": { + "value": "@pipeline().parameters.OutputBlobNameList", + "type": "Expression" + }, + "activities": [ + { + "type": "Copy", + "typeProperties": { + "source": { + "type": "BlobSource" + }, + "sink": { + "type": "BlobSink" + }, + "dataIntegrationUnits": 32 + }, + "inputs": [ + { + "referenceName": "exampleDataset", + "parameters": { + "MyFolderPath": "examplecontainer", + "MyFileName": "examplecontainer.csv" + }, + "type": "DatasetReference" + } + ], + "outputs": [ + { + "referenceName": "exampleDataset", + "parameters": { + "MyFolderPath": "examplecontainer", + "MyFileName": { + "value": "@item()", + "type": "Expression" + } + }, + "type": "DatasetReference" + } + ], + "name": "ExampleCopyActivity" + } + ] + }, + "name": "ExampleForeachActivity" + } + ]`, + ExpectActivityCount: 1, + ExpectErr: false, + }, + } + + for _, tc := range cases { + items, err := deserializeDataFactoryPipelineActivities(tc.Json) + if err != nil { + if tc.ExpectErr { + t.Log("Expected error and got error") + return + } + + t.Fatal(err) + } + + if items == nil && !tc.ExpectErr { + t.Fatal("Expected items got nil") + } + + if len(*items) != tc.ExpectActivityCount { + t.Fatal("Failed to deserialise pipeline") + } + } +} + +func TestNormalizeJSON(t *testing.T) { + cases := []struct { + Old string + New string + Suppress bool + }{ + { + Old: `[ + { + "name": "Append variable1", + "type": "AppendVariable", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "variableName": "bob", + "value": "something" + } + } + ]`, + New: `[ + { + "name": "Append variable1", + "type": "AppendVariable", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "value": "something", + "variableName": "bob" + } + } + ]`, + Suppress: true, + }, + { + Old: `[ + { + "name": "Append variable1", + "type": "AppendVariable", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "variableName": "bobdifferent", + "value": "something" + } + } + ]`, + New: `[ + { + "name": "Append variable1", + "type": "AppendVariable", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "value": "something", + "variableName": "bob" + } + } + ]`, + Suppress: false, + }, + { + Old: `{ "notbob": "notbill" }`, + New: `{ "bob": "bill" }`, + Suppress: false, + }, + } + + for _, tc := range cases { + suppress := suppressJsonOrderingDifference("test", tc.Old, tc.New, nil) + + if suppress != tc.Suppress { + t.Fatalf("Expected JsonOrderingDifference to be '%t' for '%s' '%s' - got '%t'", tc.Suppress, tc.Old, tc.New, suppress) + } + } +} diff --git a/azurerm/internal/services/datafactory/tests/data_factory_pipeline_resource_test.go b/azurerm/internal/services/datafactory/tests/data_factory_pipeline_resource_test.go index e1f5574c534b..85a107d1a738 100644 --- a/azurerm/internal/services/datafactory/tests/data_factory_pipeline_resource_test.go +++ b/azurerm/internal/services/datafactory/tests/data_factory_pipeline_resource_test.go @@ -63,6 +63,45 @@ func TestAccAzureRMDataFactoryPipeline_update(t *testing.T) { }) } +func TestAccAzureRMDataFactoryPipeline_activities(t *testing.T) { + data := acceptance.BuildTestData(t, "azurerm_data_factory_pipeline", "test") + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acceptance.PreCheck(t) }, + Providers: acceptance.SupportedProviders, + CheckDestroy: testCheckAzureRMDataFactoryPipelineDestroy, + Steps: []resource.TestStep{ + { + Config: testAccAzureRMDataFactoryPipeline_activities(data), + Check: resource.ComposeTestCheckFunc( + testCheckAzureRMDataFactoryPipelineExists(data.ResourceName), + resource.TestCheckResourceAttrSet(data.ResourceName, "activities_json"), + testCheckAzureRMDataFactoryPipelineHasAppenVarActivity(data.ResourceName, "Append variable1"), + ), + }, + data.ImportStep(), + { + Config: testAccAzureRMDataFactoryPipeline_activitiesUpdated(data), + Check: resource.ComposeTestCheckFunc( + testCheckAzureRMDataFactoryPipelineExists(data.ResourceName), + resource.TestCheckResourceAttrSet(data.ResourceName, "activities_json"), + testCheckAzureRMDataFactoryPipelineHasAppenVarActivity(data.ResourceName, "Append variable1"), + ), + }, + data.ImportStep(), + { + Config: testAccAzureRMDataFactoryPipeline_activities(data), + Check: resource.ComposeTestCheckFunc( + testCheckAzureRMDataFactoryPipelineExists(data.ResourceName), + resource.TestCheckResourceAttrSet(data.ResourceName, "activities_json"), + testCheckAzureRMDataFactoryPipelineHasAppenVarActivity(data.ResourceName, "Append variable1"), + ), + }, + data.ImportStep(), + }, + }) +} + func testCheckAzureRMDataFactoryPipelineDestroy(s *terraform.State) error { client := acceptance.AzureProvider.Meta().(*clients.Client).DataFactory.PipelinesClient ctx := acceptance.AzureProvider.Meta().(*clients.Client).StopContext @@ -115,6 +154,41 @@ func testCheckAzureRMDataFactoryPipelineExists(resourceName string) resource.Tes } } +func testCheckAzureRMDataFactoryPipelineHasAppenVarActivity(resourceName string, activityName string) resource.TestCheckFunc { + return func(s *terraform.State) error { + client := acceptance.AzureProvider.Meta().(*clients.Client).DataFactory.PipelinesClient + ctx := acceptance.AzureProvider.Meta().(*clients.Client).StopContext + + rs, ok := s.RootModule().Resources[resourceName] + if !ok { + return fmt.Errorf("Not found: %s", resourceName) + } + + name := rs.Primary.Attributes["name"] + dataFactoryName := rs.Primary.Attributes["data_factory_name"] + resourceGroup := rs.Primary.Attributes["resource_group_name"] + + resp, err := client.Get(ctx, resourceGroup, dataFactoryName, name, "") + if err != nil { + if utils.ResponseWasNotFound(resp.Response) { + return fmt.Errorf("Bad: Data Factory Pipeline %q (Resource Group %q / Data Factory %q) does not exist", name, resourceGroup, dataFactoryName) + } + return fmt.Errorf("Bad: Get on DataFactoryPipelineClient: %+v", err) + } + + activities := *resp.Activities + if len(activities) == 0 { + return fmt.Errorf("Bad: No activities associated with Data Factory Pipeline %q (Resource Group %q / Data Factory %q)", name, resourceGroup, dataFactoryName) + } + appvarActivity, _ := activities[0].AsAppendVariableActivity() + if *appvarActivity.Name != activityName { + return fmt.Errorf("Bad: Data Factory Pipeline %q (Resource Group %q / Data Factory %q) could not cast as activity", name, resourceGroup, dataFactoryName) + } + + return nil + } +} + func testAccAzureRMDataFactoryPipeline_basic(data acceptance.TestData) string { return fmt.Sprintf(` provider "azurerm" { @@ -213,3 +287,87 @@ resource "azurerm_data_factory_pipeline" "test" { } `, data.RandomInteger, data.Locations.Primary, data.RandomInteger, data.RandomInteger) } + +func testAccAzureRMDataFactoryPipeline_activities(data acceptance.TestData) string { + return fmt.Sprintf(` +provider "azurerm" { + features {} +} + +resource "azurerm_resource_group" "test" { + name = "acctestRG-%d" + location = "%s" +} + +resource "azurerm_data_factory" "test" { + name = "acctestdfv2%d" + location = azurerm_resource_group.test.location + resource_group_name = azurerm_resource_group.test.name +} + +resource "azurerm_data_factory_pipeline" "test" { + name = "acctest%d" + resource_group_name = azurerm_resource_group.test.name + data_factory_name = azurerm_data_factory.test.name + variables = { + "bob" = "item1" + } + activities_json = <