Skip to content

Commit

Permalink
Merge pull request #23275 from gerrardcowburn/f-aws_glue_job-streamin…
Browse files Browse the repository at this point in the history
…g-enhancement

Glue streaming job update
  • Loading branch information
ewbankkit authored Mar 3, 2022
2 parents b311d69 + 70990eb commit ae123f7
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .changelog/23275.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_glue_job: Add support for [streaming jobs](https://docs.aws.amazon.com/glue/latest/dg/add-job-streaming.html) by removing the default value for the `timeout` argument and marking it as Computed
```
17 changes: 12 additions & 5 deletions internal/service/glue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,10 @@ func ResourceJob() *schema.Resource {
"tags": tftags.TagsSchema(),
"tags_all": tftags.TagsSchemaComputed(),
"timeout": {
Type: schema.TypeInt,
Optional: true,
Default: 2880,
Type: schema.TypeInt,
Optional: true,
Computed: true,
ValidateFunc: validation.IntAtLeast(1),
},
"security_configuration": {
Type: schema.TypeString,
Expand Down Expand Up @@ -172,7 +173,10 @@ func resourceJobCreate(d *schema.ResourceData, meta interface{}) error {
Name: aws.String(name),
Role: aws.String(d.Get("role_arn").(string)),
Tags: Tags(tags.IgnoreAWS()),
Timeout: aws.Int64(int64(d.Get("timeout").(int))),
}

if v, ok := d.GetOk("timeout"); ok {
input.Timeout = aws.Int64(int64(v.(int)))
}

if v, ok := d.GetOk("max_capacity"); ok {
Expand Down Expand Up @@ -334,7 +338,10 @@ func resourceJobUpdate(d *schema.ResourceData, meta interface{}) error {
jobUpdate := &glue.JobUpdate{
Command: expandGlueJobCommand(d.Get("command").([]interface{})),
Role: aws.String(d.Get("role_arn").(string)),
Timeout: aws.Int64(int64(d.Get("timeout").(int))),
}

if v, ok := d.GetOk("timeout"); ok {
jobUpdate.Timeout = aws.Int64(int64(v.(int)))
}

if v, ok := d.GetOk("number_of_workers"); ok {
Expand Down
90 changes: 90 additions & 0 deletions internal/service/glue/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,43 @@ func TestAccGlueJob_basic(t *testing.T) {
})
}

func TestAccGlueJob_basicStreaming(t *testing.T) {
var job glue.Job

rName := fmt.Sprintf("tf-acc-test-%s", sdkacctest.RandString(5))
resourceName := "aws_glue_job.test"
roleResourceName := "aws_iam_role.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, glue.EndpointsID),
Providers: acctest.Providers,
CheckDestroy: testAccCheckJobDestroy,
Steps: []resource.TestStep{
{
Config: testAccJobConfig_RequiredStreaming(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckJobExists(resourceName, &job),
acctest.CheckResourceAttrRegionalARN(resourceName, "arn", "glue", fmt.Sprintf("job/%s", rName)),
resource.TestCheckResourceAttr(resourceName, "command.#", "1"),
resource.TestCheckResourceAttr(resourceName, "command.0.name", "gluestreaming"),
resource.TestCheckResourceAttr(resourceName, "command.0.script_location", "testscriptlocation"),
resource.TestCheckResourceAttr(resourceName, "default_arguments.%", "0"),
resource.TestCheckResourceAttr(resourceName, "non_overridable_arguments.%", "0"),
resource.TestCheckResourceAttr(resourceName, "name", rName),
resource.TestCheckResourceAttrPair(resourceName, "role_arn", roleResourceName, "arn"),
resource.TestCheckResourceAttr(resourceName, "tags.%", "0"),
resource.TestCheckResourceAttr(resourceName, "timeout", "0"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}
func TestAccGlueJob_command(t *testing.T) {
var job glue.Job

Expand Down Expand Up @@ -412,6 +449,40 @@ func TestAccGlueJob_tags(t *testing.T) {
})
}

func TestAccGlueJob_streamingTimeout(t *testing.T) {
var job glue.Job

rName := fmt.Sprintf("tf-acc-test-%s", sdkacctest.RandString(5))
resourceName := "aws_glue_job.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, glue.EndpointsID),
Providers: acctest.Providers,
CheckDestroy: testAccCheckJobDestroy,
Steps: []resource.TestStep{
{
Config: testAccJobConfig_Timeout(rName, 1),
Check: resource.ComposeTestCheckFunc(
testAccCheckJobExists(resourceName, &job),
resource.TestCheckResourceAttr(resourceName, "timeout", "1"),
),
},
{
Config: testAccJobConfig_Timeout(rName, 2),
Check: resource.ComposeTestCheckFunc(
testAccCheckJobExists(resourceName, &job),
resource.TestCheckResourceAttr(resourceName, "timeout", "2"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}
func TestAccGlueJob_timeout(t *testing.T) {
var job glue.Job

Expand Down Expand Up @@ -943,6 +1014,25 @@ resource "aws_glue_job" "test" {
`, testAccJobConfig_Base(rName), rName)
}

func testAccJobConfig_RequiredStreaming(rName string) string {
return fmt.Sprintf(`
%s
resource "aws_glue_job" "test" {
max_capacity = 10
name = "%s"
role_arn = aws_iam_role.test.arn
command {
name = "gluestreaming"
script_location = "testscriptlocation"
}
depends_on = [aws_iam_role_policy_attachment.test]
}
`, testAccJobConfig_Base(rName), rName)
}

func testAccJobTags1Config(rName, tagKey1, tagValue1 string) string {
return testAccJobConfig_Base(rName) + fmt.Sprintf(`
resource "aws_glue_job" "test" {
Expand Down
18 changes: 16 additions & 2 deletions website/docs/r/glue_job.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ resource "aws_glue_job" "example" {
}
```

### Streaming Job

```terraform
resource "aws_glue_job" "example" {
name = "example streaming job"
role_arn = aws_iam_role.example.arn
command {
name = "gluestreaming"
script_location = "s3://${aws_s3_bucket.example.bucket}/example.script"
}
}
```

### Enabling CloudWatch Logs and Metrics

```terraform
Expand Down Expand Up @@ -82,14 +96,14 @@ The following arguments are supported:
* `notification_property` - (Optional) Notification property of the job. Defined below.
* `role_arn` – (Required) The ARN of the IAM role associated with this job.
* `tags` - (Optional) Key-value map of resource tags. If configured with a provider [`default_tags` configuration block](/docs/providers/aws/index.html#default_tags-configuration-block) present, tags with matching keys will overwrite those defined at the provider-level.
* `timeout` – (Optional) The job timeout in minutes. The default is 2880 minutes (48 hours).
* `timeout` – (Optional) The job timeout in minutes. The default is 2880 minutes (48 hours) for `glueetl` and `pythonshell` jobs, and null (unlimted) for `gluestreaming` jobs.
* `security_configuration` - (Optional) The name of the Security Configuration to be associated with the job.
* `worker_type` - (Optional) The type of predefined worker that is allocated when a job runs. Accepts a value of Standard, G.1X, or G.2X.
* `number_of_workers` - (Optional) The number of workers of a defined workerType that are allocated when a job runs.

### command Argument Reference

* `name` - (Optional) The name of the job command. Defaults to `glueetl`. Use `pythonshell` for Python Shell Job Type, `max_capacity` needs to be set if `pythonshell` is chosen.
* `name` - (Optional) The name of the job command. Defaults to `glueetl`. Use `pythonshell` for Python Shell Job Type, or `gluestreaming` for Streaming Job Type. `max_capacity` needs to be set if `pythonshell` is chosen.
* `script_location` - (Required) Specifies the S3 path to a script that executes a job.
* `python_version` - (Optional) The Python version being used to execute a Python shell job. Allowed values are 2 or 3.

Expand Down

0 comments on commit ae123f7

Please sign in to comment.