Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/resources/airflow_pool.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@ The following arguments are supported:

* `name` - (Required) The name of pool.
* `slots` - (Required) The maximum number of slots that can be assigned to tasks. One job may occupy one or more slots.
* `description` - (Optional) The description of the pool.
* `include_deferred` - (Optional) Whether to include deferred tasks in the pool's

## Attributes Reference

This resource exports the following attributes:

* `id` - The pool name.
* `occupied_slots` - The number of slots used by running/queued tasks at the moment.
* `used_slots` - The number of slots used by running tasks at the moment.
* `queued_slots` - The number of slots used by queued tasks at the moment.
* `open_slots` - The number of free slots at the moment.
* `running_slots` - The number of slots used by running tasks at the moment.
* `deferred_slots` - The number of slots used by deferred tasks at the moment.
* `scheduled_slots` - The number of slots used by scheduled tasks at the moment.

## Import

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,5 @@ require (
google.golang.org/grpc v1.75.1 // indirect
google.golang.org/protobuf v1.36.9 // indirect
)

replace github.com/apache/airflow-client-go/airflow => github.com/drfaust92/airflow-client-go/airflow v0.0.0-20250103045940-da4c5c1666ad
336 changes: 2 additions & 334 deletions go.sum

Large diffs are not rendered by default.

57 changes: 49 additions & 8 deletions internal/provider/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,36 @@ func resourcePool() *schema.Resource {
Type: schema.TypeInt,
Required: true,
},
"description": {
Type: schema.TypeString,
Optional: true,
},
"include_deferred": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"occupied_slots": {
Type: schema.TypeInt,
Computed: true,
},
"used_slots": {
"queued_slots": {
Type: schema.TypeInt,
Computed: true,
},
"queued_slots": {
"open_slots": {
Type: schema.TypeInt,
Computed: true,
},
"open_slots": {
"running_slots": {
Type: schema.TypeInt,
Computed: true,
},
"deferred_slots": {
Type: schema.TypeInt,
Computed: true,
},
"scheduled_slots": {
Type: schema.TypeInt,
Computed: true,
},
Expand All @@ -53,11 +70,17 @@ func resourcePoolCreate(ctx context.Context, d *schema.ResourceData, m interface

name := d.Get("name").(string)
slots := int32(d.Get("slots").(int))
includeDeferred := d.Get("include_deferred").(bool)
varApi := client.PoolApi

pool := airflow.Pool{
Name: &name,
Slots: &slots,
Name: &name,
Slots: &slots,
IncludeDeferred: &includeDeferred,
}

if description, ok := d.GetOk("description"); ok {
pool.SetDescription(description.(string))
}

_, _, err := varApi.PostPool(pcfg.AuthContext).Pool(pool).Execute()
Expand Down Expand Up @@ -97,7 +120,19 @@ func resourcePoolRead(ctx context.Context, d *schema.ResourceData, m interface{}
if err := d.Set("open_slots", pool.OpenSlots); err != nil {
return diag.FromErr(err)
}
if err := d.Set("used_slots", pool.UsedSlots); err != nil {
if err := d.Set("description", pool.Description); err != nil {
return diag.FromErr(err)
}
if err := d.Set("include_deferred", pool.IncludeDeferred); err != nil {
return diag.FromErr(err)
}
if err := d.Set("running_slots", pool.RunningSlots); err != nil {
return diag.FromErr(err)
}
if err := d.Set("deferred_slots", pool.DeferredSlots); err != nil {
return diag.FromErr(err)
}
if err := d.Set("scheduled_slots", pool.ScheduledSlots); err != nil {
return diag.FromErr(err)
}

Expand All @@ -109,11 +144,17 @@ func resourcePoolUpdate(ctx context.Context, d *schema.ResourceData, m interface
client := pcfg.ApiClient

slots := int32(d.Get("slots").(int))
includeDeferred := d.Get("include_deferred").(bool)
name := d.Id()

pool := airflow.Pool{
Name: &name,
Slots: &slots,
Name: &name,
Slots: &slots,
IncludeDeferred: &includeDeferred,
}

if description, ok := d.GetOk("description"); ok {
pool.SetDescription(description.(string))
}

_, _, err := client.PoolApi.PatchPool(pcfg.AuthContext, name).Pool(pool).Execute()
Expand Down
6 changes: 3 additions & 3 deletions internal/provider/resource_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func resourceRoleCreate(ctx context.Context, d *schema.ResourceData, m interface

if v, ok := d.GetOk("action"); ok && v.(*schema.Set).Len() > 0 {
actions := expandAirflowRoleActions(d.Get("action").(*schema.Set).List())
role.Actions = &actions
role.Actions = actions
}

_, _, err := varApi.PostRole(pcfg.AuthContext).Role(role).Execute()
Expand Down Expand Up @@ -83,7 +83,7 @@ func resourceRoleRead(ctx context.Context, d *schema.ResourceData, m interface{}
if err := d.Set("name", role.Name); err != nil {
return diag.FromErr(err)
}
if err := d.Set("action", flattenAirflowRoleActions(*role.Actions)); err != nil {
if err := d.Set("action", flattenAirflowRoleActions(role.Actions)); err != nil {
return diag.Errorf("error setting action: %s", err)
}

Expand All @@ -98,7 +98,7 @@ func resourceRoleUpdate(ctx context.Context, d *schema.ResourceData, m interface
actions := expandAirflowRoleActions(d.Get("action").(*schema.Set).List())
role := airflow.Role{
Name: &name,
Actions: &actions,
Actions: actions,
}

_, _, err := client.RoleApi.PatchRole(pcfg.AuthContext, name).Role(role).Execute()
Expand Down
6 changes: 3 additions & 3 deletions internal/provider/resource_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func resourceUserCreate(ctx context.Context, d *schema.ResourceData, m interface
LastName: &lastName,
Username: &username,
Password: &password,
Roles: &roles,
Roles: roles,
}).Execute()
if err != nil {
return diag.Errorf("failed to create user `%s` from Airflow: %s", email, err)
Expand Down Expand Up @@ -129,7 +129,7 @@ func resourceUserRead(ctx context.Context, d *schema.ResourceData, m interface{}
if err := d.Set("password", d.Get("password").(string)); err != nil {
return diag.FromErr(err)
}
if err := d.Set("roles", flattenAirflowUserRoles(*user.Roles)); err != nil {
if err := d.Set("roles", flattenAirflowUserRoles(user.Roles)); err != nil {
return diag.FromErr(err)
}

Expand All @@ -152,7 +152,7 @@ func resourceUserUpdate(ctx context.Context, d *schema.ResourceData, m interface
FirstName: &firstName,
LastName: &lastName,
Password: &password,
Roles: &roles,
Roles: roles,
Username: &username,
}).Execute()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/provider/resource_user_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func resourceUserRolesCreate(ctx context.Context, d *schema.ResourceData, m inte
userApi := client.UserApi

_, _, err := userApi.PatchUser(pcfg.AuthContext, username).UpdateMask([]string{"roles"}).User(airflow.User{
Roles: &roles,
Roles: roles,
Username: &username,
FirstName: &username,
LastName: &username,
Expand Down Expand Up @@ -73,7 +73,7 @@ func resourceUserRolesRead(ctx context.Context, d *schema.ResourceData, m interf
if err := d.Set("username", user.Username); err != nil {
return diag.FromErr(err)
}
if err := d.Set("roles", flattenAirflowUserRoles(*user.Roles)); err != nil {
if err := d.Set("roles", flattenAirflowUserRoles(user.Roles)); err != nil {
return diag.FromErr(err)
}

Expand All @@ -88,7 +88,7 @@ func resourceUserRolesUpdate(ctx context.Context, d *schema.ResourceData, m inte
username := d.Id()

_, _, err := client.UserApi.PatchUser(pcfg.AuthContext, username).UpdateMask([]string{"roles"}).User(airflow.User{
Roles: &roles,
Roles: roles,
Username: &username,
FirstName: &username,
LastName: &username,
Expand All @@ -110,7 +110,7 @@ func resourceUserRolesDelete(ctx context.Context, d *schema.ResourceData, m inte

var err error
_, _, _ = client.UserApi.PatchUser(pcfg.AuthContext, username).UpdateMask([]string{"roles"}).User(airflow.User{
Roles: &roles,
Roles: roles,
Username: &username,
FirstName: &username,
LastName: &username,
Expand Down
4 changes: 2 additions & 2 deletions internal/provider/resource_user_roles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func testAccCheckAirflowUserRolesCheckDestroy(s *terraform.State) error {

user, res, err := client.ApiClient.UserApi.GetUser(client.AuthContext, rs.Primary.ID).Execute()
if err == nil {
if len(*user.Roles) != 0 {
if len(user.Roles) != 0 {
return fmt.Errorf("Airflow User (%s) still have some roles.", rs.Primary.ID)
}
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func testAccPreCheckCreateUser(t *testing.T) {
LastName: &lastName,
Username: &username,
Password: &password,
Roles: &roles,
Roles: roles,
}).Execute()
if err != nil {
t.Fatalf("failed to create user `%s` from Airflow: %s", username, err)
Expand Down