Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Resource: Dataflow Job #855

Merged
merged 11 commits into from
Jan 10, 2018
12 changes: 10 additions & 2 deletions google/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
computeBeta "google.golang.org/api/compute/v0.beta"
"google.golang.org/api/compute/v1"
"google.golang.org/api/container/v1"
"google.golang.org/api/dataflow/v1b3"
"google.golang.org/api/dataproc/v1"
"google.golang.org/api/dns/v1"
"google.golang.org/api/iam/v1"
Expand All @@ -50,6 +51,7 @@ type Config struct {
clientComputeBeta *computeBeta.Service
clientContainer *container.Service
clientDataproc *dataproc.Service
clientDataflow *dataflow.Service
clientDns *dns.Service
clientKms *cloudkms.Service
clientLogging *cloudlogging.Service
Expand All @@ -64,8 +66,7 @@ type Config struct {
clientIAM *iam.Service
clientServiceMan *servicemanagement.APIService
clientBigQuery *bigquery.Service

bigtableClientFactory *BigtableClientFactory
bigtableClientFactory *BigtableClientFactory
}

func (c *Config) loadAndValidate() error {
Expand Down Expand Up @@ -195,6 +196,13 @@ func (c *Config) loadAndValidate() error {
}
c.clientPubsub.UserAgent = userAgent

log.Printf("[INFO] Instantiating Google Dataflow Client...")
c.clientDataflow, err = dataflow.New(client)
if err != nil {
return err
}
c.clientDataflow.UserAgent = userAgent

log.Printf("[INFO] Instantiating Google Cloud ResourceManager Client...")
c.clientResourceManager, err = cloudresourcemanager.New(client)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions google/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func Provider() terraform.ResourceProvider {
"google_compute_vpn_tunnel": resourceComputeVpnTunnel(),
"google_container_cluster": resourceContainerCluster(),
"google_container_node_pool": resourceContainerNodePool(),
"google_dataflow_job": resourceDataflowJob(),
"google_dataproc_cluster": resourceDataprocCluster(),
"google_dataproc_job": resourceDataprocJob(),
"google_dns_managed_zone": resourceDnsManagedZone(),
Expand Down
201 changes: 201 additions & 0 deletions google/resource_dataflow_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package google
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll also want to add documentation in website/docs/r/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this last night but forgot to comment.


import (
"fmt"
"log"
"strings"
"time"

"github.com/hashicorp/terraform/helper/schema"
"github.com/hashicorp/terraform/helper/validation"

"google.golang.org/api/dataflow/v1b3"
"google.golang.org/api/googleapi"
)

var dataflowTerminalStatesMap = map[string]struct{}{
"JOB_STATE_DONE": {},
"JOB_STATE_FAILED": {},
"JOB_STATE_CANCELLED": {},
"JOB_STATE_UPDATED": {},
"JOB_STATE_DRAINING": {},
"JOB_STATE_DRAINED": {},
"JOB_STATE_CANCELLING": {},
}

func resourceDataflowJob() *schema.Resource {
return &schema.Resource{
Create: resourceDataflowJobCreate,
Read: resourceDataflowJobRead,
Delete: resourceDataflowJobDelete,

Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"template_gcs_path": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"temp_gcs_location": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"zone": &schema.Schema{
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},

"max_workers": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
},

"parameters": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
},

"on_delete": &schema.Schema{
Type: schema.TypeString,
ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false),
Optional: true,
Default: "drain",
ForceNew: true,
},

"project": &schema.Schema{
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},

"state": &schema.Schema{
Type: schema.TypeString,
Computed: true,
},
},
}
}

func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

project, err := getProject(d, config)
if err != nil {
return err
}

zone, err := getZone(d, config)
if err != nil {
return err
}
params := expandStringMap(d, "parameters")

env := dataflow.RuntimeEnvironment{
TempLocation: d.Get("temp_gcs_location").(string),
Zone: zone,
MaxWorkers: int64(d.Get("max_workers").(int)),
}

request := dataflow.CreateJobFromTemplateRequest{
JobName: d.Get("name").(string),
GcsPath: d.Get("template_gcs_path").(string),
Parameters: params,
Environment: &env,
}

job, err := config.clientDataflow.Projects.Templates.Create(project, &request).Do()
if err != nil {
return err
}
d.SetId(job.Id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For compute resources that are asynchronous, we usually follow this order:

  • Send the request
  • Set the Id
  • Wait on the request
  • If there's an error, set the id to ""

Since dataflow seems to use a synchronous API, we can't follow that exact pattern, but I still think setting the id before sending the request is probably the way to go:

  • If the Terraform process quits for whatever reason when the request is running, the request will have already been sent, so it means that the resource does exist in GCP. With the id in the state file already, we can read the information back when we call the read() function on plan, and we don't lose out on anything. If the id didn't get saved, then Terraform would think it needed to recreate it.
  • If the Terraform process quits (for whatever reason) exactly between when we set the id and when we make the request, then when it does the refresh on plan it would just see that it doesn't actually exist and set the id back to empty.
  • If the Terraform process doesn't quit then it doesn't really matter when the id gets set.

Anyway that's a long explanation for a tiny change that doesn't make a difference 99% of the time, but now you know :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately the ID here is created on the server side and not delivered back to us until the create has already succeeded. Think it's worth trying to use an async api instead?


return resourceDataflowJobRead(d, meta)
}

func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

project, err := getProject(d, config)
if err != nil {
return err
}

id := d.Id()

job, err := config.clientDataflow.Projects.Jobs.Get(project, id).Do()
if err != nil {
return handleNotFoundError(err, d, fmt.Sprintf("Dataflow job %s", id))
}

if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
d.SetId("")
return nil
}
d.Set("state", job.CurrentState)
d.Set("name", job.Name)
d.Set("project", project)
d.SetId(job.Id)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally it's a good idea to set as many fields as you have access to on read, not just computed ones. This helps detect any drift between what Terraform thinks is true and what GCP thinks is true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return nil
}

func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

project, err := getProject(d, config)
if err != nil {
return err
}

id := d.Id()
requestedState, err := mapOnDelete(d.Get("on_delete").(string))
if err != nil {
return err
}
for _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok; _, ok = dataflowTerminalStatesMap[d.Get("state").(string)] {
job := &dataflow.Job{
RequestedState: requestedState,
}

_, err = config.clientDataflow.Projects.Jobs.Update(project, id, job).Do()
if gerr, ok := err.(*googleapi.Error); !ok {
// If we have an error and it's not a google-specific error, we should go ahead and return.
return err
} else if ok && strings.Contains(gerr.Message, "not yet ready for canceling") {
time.Sleep(5 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So usually for something that requires checking a bunch of times if something is in a certain state, we would use a StateChangeConf (see all the _operation.go files for examples), which handles things like timeouts and backoff and whatnot. Is there a way to know just based on read whether the job is ready for canceling, or is it something you can only know by trying it and seeing if it works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can only know by trying it, it seems to me.

} else {
return err
}
err = resourceDataflowJobRead(d, meta)
if err != nil {
return err
}
}
d.SetId("")

return nil
}

func mapOnDelete(policy string) (string, error) {
switch policy {
case "cancel":
return "JOB_STATE_CANCELLED", nil
case "drain":
return "JOB_STATE_DRAINING", nil
default:
return "", fmt.Errorf("Invalid `on_delete` policy: %s", policy)
}
}
91 changes: 91 additions & 0 deletions google/resource_dataflow_job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package google

import (
"fmt"
"testing"

"github.com/hashicorp/terraform/helper/acctest"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)

func TestAccDataflowJobCreate(t *testing.T) {
t.Parallel()
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataflowJobDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccDataflowJob,
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(
"google_dataflow_job.big_data"),
),
},
},
})
}

func testAccCheckDataflowJobDestroy(s *terraform.State) error {
for _, rs := range s.RootModule().Resources {
if rs.Type != "google_dataflow_job" {
continue
}

config := testAccProvider.Meta().(*Config)
job, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).Do()
if job != nil {
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
return fmt.Errorf("Job still present")
}
} else if err != nil {
return err
}
}

return nil
}

func testAccDataflowJobExists(n string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}

if rs.Primary.ID == "" {
return fmt.Errorf("No ID is set")
}
config := testAccProvider.Meta().(*Config)
_, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).Do()
if err != nil {
return fmt.Errorf("Job does not exist")
}

return nil
}
}

var testAccDataflowJob = fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
name = "dfjob-test-%s-temp"

force_destroy = true
}

resource "google_dataflow_job" "big_data" {
name = "dfjob-test-%s"

template_gcs_path = "gs://dataflow-templates/wordcount/template_file"
temp_gcs_location = "${google_storage_bucket.temp.url}"

parameters {
inputFile = "gs://dataflow-samples/shakespeare/kinglear.txt"
output = "${google_storage_bucket.temp.url}/output"
}
zone = "us-central1-f"
project = "%s"

on_delete = "cancel"
}`, acctest.RandString(10), acctest.RandString(10), getTestProjectFromEnv())
Loading