-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New Resource: Dataflow Job #855
Changes from 8 commits
8869170
ba23a52
9fb15bd
7487f51
52ffad0
6529289
c2b1b01
6956ec6
844310c
0f047a6
828909f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
package google | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
"strings" | ||
"time" | ||
|
||
"github.com/hashicorp/terraform/helper/schema" | ||
"github.com/hashicorp/terraform/helper/validation" | ||
|
||
"google.golang.org/api/dataflow/v1b3" | ||
"google.golang.org/api/googleapi" | ||
) | ||
|
||
var dataflowTerminalStatesMap = map[string]struct{}{ | ||
"JOB_STATE_DONE": {}, | ||
"JOB_STATE_FAILED": {}, | ||
"JOB_STATE_CANCELLED": {}, | ||
"JOB_STATE_UPDATED": {}, | ||
"JOB_STATE_DRAINING": {}, | ||
"JOB_STATE_DRAINED": {}, | ||
"JOB_STATE_CANCELLING": {}, | ||
} | ||
|
||
func resourceDataflowJob() *schema.Resource { | ||
return &schema.Resource{ | ||
Create: resourceDataflowJobCreate, | ||
Read: resourceDataflowJobRead, | ||
Delete: resourceDataflowJobDelete, | ||
|
||
Schema: map[string]*schema.Schema{ | ||
"name": &schema.Schema{ | ||
Type: schema.TypeString, | ||
Required: true, | ||
ForceNew: true, | ||
}, | ||
|
||
"template_gcs_path": &schema.Schema{ | ||
Type: schema.TypeString, | ||
Required: true, | ||
ForceNew: true, | ||
}, | ||
|
||
"temp_gcs_location": &schema.Schema{ | ||
Type: schema.TypeString, | ||
Required: true, | ||
ForceNew: true, | ||
}, | ||
|
||
"zone": &schema.Schema{ | ||
Type: schema.TypeString, | ||
Optional: true, | ||
ForceNew: true, | ||
}, | ||
|
||
"max_workers": &schema.Schema{ | ||
Type: schema.TypeInt, | ||
Optional: true, | ||
ForceNew: true, | ||
}, | ||
|
||
"parameters": { | ||
Type: schema.TypeMap, | ||
Optional: true, | ||
ForceNew: true, | ||
}, | ||
|
||
"on_delete": &schema.Schema{ | ||
Type: schema.TypeString, | ||
ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false), | ||
Optional: true, | ||
Default: "drain", | ||
ForceNew: true, | ||
}, | ||
|
||
"project": &schema.Schema{ | ||
Type: schema.TypeString, | ||
Optional: true, | ||
ForceNew: true, | ||
}, | ||
|
||
"state": &schema.Schema{ | ||
Type: schema.TypeString, | ||
Computed: true, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error { | ||
config := meta.(*Config) | ||
|
||
project, err := getProject(d, config) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
zone, err := getZone(d, config) | ||
if err != nil { | ||
return err | ||
} | ||
params := expandStringMap(d, "parameters") | ||
|
||
env := dataflow.RuntimeEnvironment{ | ||
TempLocation: d.Get("temp_gcs_location").(string), | ||
Zone: zone, | ||
MaxWorkers: int64(d.Get("max_workers").(int)), | ||
} | ||
|
||
request := dataflow.CreateJobFromTemplateRequest{ | ||
JobName: d.Get("name").(string), | ||
GcsPath: d.Get("template_gcs_path").(string), | ||
Parameters: params, | ||
Environment: &env, | ||
} | ||
|
||
job, err := config.clientDataflow.Projects.Templates.Create(project, &request).Do() | ||
if err != nil { | ||
return err | ||
} | ||
d.SetId(job.Id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For compute resources that are asynchronous, we usually follow this order:
Since dataflow seems to use a synchronous API, we can't follow that exact pattern, but I still think setting the id before sending the request is probably the way to go:
Anyway that's a long explanation for a tiny change that doesn't make a difference 99% of the time, but now you know :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately the ID here is created on the server side and not delivered back to us until the create has already succeeded. Think it's worth trying to use an async api instead? |
||
|
||
return resourceDataflowJobRead(d, meta) | ||
} | ||
|
||
func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error { | ||
config := meta.(*Config) | ||
|
||
project, err := getProject(d, config) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
id := d.Id() | ||
|
||
job, err := config.clientDataflow.Projects.Jobs.Get(project, id).Do() | ||
if err != nil { | ||
return handleNotFoundError(err, d, fmt.Sprintf("Dataflow job %s", id)) | ||
} | ||
|
||
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok { | ||
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState) | ||
d.SetId("") | ||
return nil | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: no need for this "else" (https://github.com/golang/go/wiki/CodeReviewComments#indent-error-flow) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, good point, thanks. |
||
d.Set("state", job.CurrentState) | ||
d.Set("name", job.Name) | ||
d.Set("project", project) | ||
d.SetId(job.Id) | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generally it's a good idea to set as many fields as you have access to on read, not just computed ones. This helps detect any drift between what Terraform thinks is true and what GCP thinks is true. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
return nil | ||
} | ||
|
||
func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error { | ||
config := meta.(*Config) | ||
|
||
project, err := getProject(d, config) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
id := d.Id() | ||
requestedState, err := mapOnDelete(d.Get("on_delete").(string)) | ||
if err != nil { | ||
return err | ||
} | ||
for _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok; _, ok = dataflowTerminalStatesMap[d.Get("state").(string)] { | ||
job := &dataflow.Job{ | ||
RequestedState: requestedState, | ||
} | ||
|
||
_, err = config.clientDataflow.Projects.Jobs.Update(project, id, job).Do() | ||
if gerr, ok := err.(*googleapi.Error); !ok { | ||
// If we have an error and it's not a google-specific error, we should go ahead and return. | ||
return err | ||
} else if ok && strings.Contains(gerr.Message, "not yet ready for canceling") { | ||
time.Sleep(5 * time.Second) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So usually for something that requires checking a bunch of times if something is in a certain state, we would use a StateChangeConf (see all the _operation.go files for examples), which handles things like timeouts and backoff and whatnot. Is there a way to know just based on read whether the job is ready for canceling, or is it something you can only know by trying it and seeing if it works? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can only know by trying it, it seems to me. |
||
} else { | ||
return err | ||
} | ||
err = resourceDataflowJobRead(d, meta) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
d.SetId("") | ||
|
||
return nil | ||
} | ||
|
||
func mapOnDelete(policy string) (string, error) { | ||
switch policy { | ||
case "cancel": | ||
return "JOB_STATE_CANCELLED", nil | ||
case "drain": | ||
return "JOB_STATE_DRAINING", nil | ||
default: | ||
return "", fmt.Errorf("Invalid `on_delete` policy: %s", policy) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
package google | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/hashicorp/terraform/helper/acctest" | ||
"github.com/hashicorp/terraform/helper/resource" | ||
"github.com/hashicorp/terraform/terraform" | ||
) | ||
|
||
func TestAccDataflowJobCreate(t *testing.T) { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
resource.Test(t, resource.TestCase{ | ||
PreCheck: func() { testAccPreCheck(t) }, | ||
Providers: testAccProviders, | ||
CheckDestroy: testAccCheckDataflowJobDestroy, | ||
Steps: []resource.TestStep{ | ||
resource.TestStep{ | ||
Config: testAccDataflowJob, | ||
Check: resource.ComposeTestCheckFunc( | ||
testAccDataflowJobExists( | ||
"google_dataflow_job.big_data"), | ||
), | ||
}, | ||
}, | ||
}) | ||
} | ||
|
||
func testAccCheckDataflowJobDestroy(s *terraform.State) error { | ||
for _, rs := range s.RootModule().Resources { | ||
if rs.Type != "google_dataflow_job" { | ||
continue | ||
} | ||
|
||
config := testAccProvider.Meta().(*Config) | ||
job, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).Do() | ||
if job != nil { | ||
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok { | ||
return fmt.Errorf("Job still present") | ||
} | ||
} else if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func testAccDataflowJobExists(n string) resource.TestCheckFunc { | ||
return func(s *terraform.State) error { | ||
rs, ok := s.RootModule().Resources[n] | ||
if !ok { | ||
return fmt.Errorf("Not found: %s", n) | ||
} | ||
|
||
if rs.Primary.ID == "" { | ||
return fmt.Errorf("No ID is set") | ||
} | ||
config := testAccProvider.Meta().(*Config) | ||
_, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).Do() | ||
if err != nil { | ||
return fmt.Errorf("Job does not exist") | ||
} | ||
|
||
return nil | ||
} | ||
} | ||
|
||
var testAccDataflowJob = fmt.Sprintf(` | ||
resource "google_storage_bucket" "temp" { | ||
name = "dfjob-test-%s-temp" | ||
|
||
force_destroy = true | ||
} | ||
|
||
resource "google_dataflow_job" "big_data" { | ||
name = "dfjob-test-%s" | ||
|
||
template_gcs_path = "gs://dataflow-templates/wordcount/template_file" | ||
temp_gcs_location = "${google_storage_bucket.temp.url}" | ||
|
||
parameters { | ||
inputFile = "gs://dataflow-samples/shakespeare/kinglear.txt" | ||
output = "${google_storage_bucket.temp.url}/output" | ||
} | ||
zone = "us-central1-f" | ||
project = "%s" | ||
|
||
on_delete = "cancel" | ||
}`, acctest.RandString(10), acctest.RandString(10), getTestProjectFromEnv()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll also want to add documentation in website/docs/r/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this last night but forgot to comment.