Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
TEST?=$$(go list ./...)

build:
go build -o terraform-provider-airflow

testacc:
TF_ACC=1 go test $(TEST) -v $(TESTARGS) -timeout 5m
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.16
require (
github.com/apache/airflow-client-go/airflow v0.0.0-20210618063701-c4bfdb8caedb
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/terraform-plugin-sdk v1.10.0
github.com/hashicorp/terraform-plugin-sdk/v2 v2.4.3
golang.org/x/net v0.0.0-20211020060615-d418f374d309 // indirect
golang.org/x/oauth2 v0.0.0-20211028175245-ba495a64dcb5 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
155 changes: 99 additions & 56 deletions go.sum

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package main

import (
"github.com/hashicorp/terraform-plugin-sdk/plugin"
"github.com/hashicorp/terraform-plugin-sdk/terraform"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/plugin"
)

func main() {
plugin.Serve(&plugin.ServeOpts{
ProviderFunc: func() terraform.ResourceProvider {
return Provider()
ProviderFunc: func() *schema.Provider {
return AirflowProvider()
},
})
}
80 changes: 33 additions & 47 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,74 +6,60 @@ import (
"net/url"

"github.com/apache/airflow-client-go/airflow"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

type ProviderConfig struct {
ApiClient *airflow.APIClient
AuthContext context.Context
}

func Provider() *schema.Provider {
func AirflowProvider() *schema.Provider {
return &schema.Provider{
Schema: map[string]*schema.Schema{
"base_endpoint": {
Type: schema.TypeString,
Required: true,
},
// username and password are used for API basic auth
"username": {
Type: schema.TypeString,
Optional: true,
Description: "The username to use for API basic authentication",
},
"password": {
Type: schema.TypeString,
Optional: true,
Description: "The password to use for API basic authentication",
Required: true,
DefaultFunc: schema.EnvDefaultFunc("AIRFLOW_BASE_ENDPOINT", nil),
},
"oauth2_token": {
Type: schema.TypeString,
Optional: true,
Description: "The username to use for API basic authentication",
Description: "The oauth to use for API authentication",
DefaultFunc: schema.EnvDefaultFunc("AIRFLOW_OAUTH2_TOKEN", nil),
},
},
ResourcesMap: map[string]*schema.Resource{
"airflow_variable": resourceVariable(),
"airflow_variable": resourceVariable(),
"airflow_connection": resourceConnection(),
},
ConfigureFunc: func(p *schema.ResourceData) (interface{}, error) {
endpoint := p.Get("base_endpoint").(string)
u, err := url.Parse(endpoint)
if err != nil {
return nil, fmt.Errorf("invalid base_endpoint: %w", err)
}

// basePath := path.Join(u.Path + "/api/experimental")
// log.Printf("[DEBUG] Using API prefix: %s", basePath)
ConfigureFunc: providerConfigure,
}
}

authCtx := context.Background()
func providerConfigure(d *schema.ResourceData) (interface{}, error) {
endpoint := d.Get("base_endpoint").(string)
u, err := url.Parse(endpoint)
if err != nil {
return nil, fmt.Errorf("invalid base_endpoint: %w", err)
}

// if username, ok := p.GetOk("username"); ok {
// var password interface{}
// if password, ok = p.GetOk("password"); !ok {
// return nil, fmt.Errorf("Found username for basic auth, but password not specified.")
// }
// log.Printf("[DEBUG] Using API Basic Auth")
authCtx := context.Background()

// cred := airflow.ContextOAuth2.BasicAuth{
// UserName: username.(string),
// Password: password.(string),
// }
authCtx = context.WithValue(authCtx, airflow.ContextOAuth2, p.Get("oauth2_token").(string))
// }
authCtx = context.WithValue(authCtx, airflow.ContextAccessToken, d.Get("oauth2_token"))

return ProviderConfig{
ApiClient: airflow.NewAPIClient(&airflow.Configuration{
Scheme: u.Scheme,
Host: u.Host,
}),
AuthContext: authCtx,
}, nil
},
}
return ProviderConfig{
ApiClient: airflow.NewAPIClient(&airflow.Configuration{
Scheme: u.Scheme,
Host: u.Host,
Debug: true,
Servers: airflow.ServerConfigurations{
{
URL: "/api/v1",
Description: "Apache Airflow Stable API.",
},
},
}),
AuthContext: authCtx,
}, nil
}
37 changes: 37 additions & 0 deletions provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"os"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

var testAccProviders map[string]*schema.Provider
var testAccProvider *schema.Provider

func init() {
testAccProvider = AirflowProvider()
testAccProviders = map[string]*schema.Provider{
"airflow": testAccProvider,
}
}

func TestProvider(t *testing.T) {
if err := AirflowProvider().InternalValidate(); err != nil {
t.Fatalf("err: %s", err)
}
}

func TestProvider_impl(t *testing.T) {
var _ *schema.Provider = AirflowProvider()
}

func testAccPreCheck(t *testing.T) {
if v := os.Getenv("AIRFLOW_OAUTH2_TOKEN"); v == "" {
t.Fatal("AIRFLOW_OAUTH2_TOKEN must be set for acceptance tests")
}
if v := os.Getenv("AIRFLOW_BASE_ENDPOINT"); v == "" {
t.Fatal("AIRFLOW_BASE_ENDPOINT must be set for acceptance tests")
}
}
121 changes: 121 additions & 0 deletions resource_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package main

import (
"fmt"

"github.com/apache/airflow-client-go/airflow"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

func resourceConnectionCreate(d *schema.ResourceData, m interface{}) error {
pcfg := m.(ProviderConfig)
client := pcfg.ApiClient
connId := d.Get("connection_id").(string)
connType := d.Get("conn_type").(string)

conn := airflow.Connection{
ConnectionId: &connId,
ConnType: &connType,
}
connApi := client.ConnectionApi

_, _, err := connApi.PostConnection(pcfg.AuthContext).Connection(conn).Execute()
if err != nil {
return fmt.Errorf("failed to create connection `%s` from Airflow: %w", connId, err)
}
d.SetId(connId)
return resourceConnectionRead(d, m)
}

func resourceConnectionRead(d *schema.ResourceData, m interface{}) error {
pcfg := m.(ProviderConfig)
client := pcfg.ApiClient
connId := d.Get("connection_id").(string)
connection, resp, err := client.ConnectionApi.GetConnection(pcfg.AuthContext, connId).Execute()
if resp != nil && resp.StatusCode == 404 {
d.SetId("")
return nil
}
if err != nil {
return fmt.Errorf("failed to get connection `%s` from Airflow: %w", connId, err)
}

d.Set("connection_id", connection.ConnectionId)
d.Set("conn_type", connection.ConnType)
return nil
}

func resourceConnectionUpdate(d *schema.ResourceData, m interface{}) error {
pcfg := m.(ProviderConfig)
client := pcfg.ApiClient
connId := d.Get("connection_id").(string)
connType := d.Get("conn_type").(string)

conn := airflow.Connection{
ConnectionId: &connId,
ConnType: &connType,
}
_, _, err := client.ConnectionApi.PatchConnection(pcfg.AuthContext, connId).Connection(conn).Execute()
if err != nil {
return fmt.Errorf("failed to update connection `%s` from Airflow: %w", connId, err)
}
d.SetId(connId)
return resourceConnectionRead(d, m)
}

func resourceConnectionDelete(d *schema.ResourceData, m interface{}) error {
pcfg := m.(ProviderConfig)
client := pcfg.ApiClient
connId := d.Get("connection_id").(string)
_, err := client.ConnectionApi.DeleteConnection(pcfg.AuthContext, connId).Execute()
if err != nil {
return fmt.Errorf("failed to delete connection `%s` from Airflow: %w", connId, err)
}

return nil
}

func resourceConnection() *schema.Resource {
return &schema.Resource{
Create: resourceConnectionCreate,
Read: resourceConnectionRead,
Update: resourceConnectionUpdate,
Delete: resourceConnectionDelete,

Schema: map[string]*schema.Schema{
"connection_id": {
Type: schema.TypeString,
Required: true,
},
"conn_type": {
Type: schema.TypeString,
Required: true,
},
"host": {
Type: schema.TypeString,
Optional: true,
},
"login": {
Type: schema.TypeString,
Optional: true,
},
"schema": {
Type: schema.TypeString,
Optional: true,
},
"port": {
Type: schema.TypeInt,
Optional: true,
},
"password": {
Type: schema.TypeString,
Optional: true,
Sensitive: true,
},
"extra": {
Type: schema.TypeString,
Optional: true,
},
},
}
}
38 changes: 38 additions & 0 deletions resource_connection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

func TestAccAirflowConnection_basic(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")

resourceName := "airflow_connection.test"
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: nil,
Steps: []resource.TestStep{
{
Config: testAccAirflowConnectionConfigBasic(rName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(resourceName, "connection_id", rName),
resource.TestCheckResourceAttr(resourceName, "conn_type", rName),
),
},
},
})
}

func testAccAirflowConnectionConfigBasic(rName string) string {
return fmt.Sprintf(`
resource "airflow_connection" "test" {
connection_id = %[1]q
conn_type = %[1]q
}
`, rName)
}
12 changes: 8 additions & 4 deletions resource_variable.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"

"github.com/apache/airflow-client-go/airflow"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

func resourceVariableCreate(d *schema.ResourceData, m interface{}) error {
Expand All @@ -19,7 +19,7 @@ func resourceVariableCreate(d *schema.ResourceData, m interface{}) error {
Value: &val,
}).Execute()
if err != nil {
return err
return fmt.Errorf("failed to create variable `%s` from Airflow: %w", key, err)
}
d.SetId(key)
return resourceVariableRead(d, m)
Expand Down Expand Up @@ -53,7 +53,7 @@ func resourceVariableUpdate(d *schema.ResourceData, m interface{}) error {
Value: &val,
}).Execute()
if err != nil {
return err
return fmt.Errorf("failed to update variable `%s` from Airflow: %w", key, err)
}
d.SetId(key)
return resourceVariableRead(d, m)
Expand All @@ -64,7 +64,11 @@ func resourceVariableDelete(d *schema.ResourceData, m interface{}) error {
client := pcfg.ApiClient
key := d.Get("key").(string)
_, err := client.VariableApi.DeleteVariable(pcfg.AuthContext, key).Execute()
return err
if err != nil {
return fmt.Errorf("failed to delete variable `%s` from Airflow: %w", key, err)
}

return nil
}

func resourceVariable() *schema.Resource {
Expand Down
Loading