Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 58 additions & 35 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:2.5.1
# Default: apache/airflow:|version|
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed.
Expand All @@ -36,21 +36,25 @@
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested).
# Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
# Use this option ONLY for quick checks. Installing requirements at container
# startup is done EVERY TIME the service is started.
# A better way is to build a custom image or extend the official image
# as described in https://airflow.apache.org/docs/docker-stack/build.html.
# Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3'
version: '3.8'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.1}
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.7.3}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
Expand All @@ -60,10 +64,20 @@ x-airflow-common:
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
# yamllint disable rule:line-length
# Use simple http server on scheduler for health checks
# See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
# yamllint enable rule:line-length
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
# WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
# for other purpose (development, test and especially production usage) build/extend Airflow image.
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
AIRFLOW__WEBSERVER__WORKERS: 2
AIRFLOW__WEBSERVER__WORKER_REFRESH_INTERVAL: 1800
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
Expand All @@ -75,7 +89,7 @@ x-airflow-common:

services:
postgres:
image: postgres:13
image: postgres:17
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
Expand All @@ -84,8 +98,9 @@ services:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
interval: 10s
retries: 5
start_period: 5s
restart: always

redis:
Expand All @@ -94,21 +109,23 @@ services:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always

airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
Expand All @@ -119,45 +136,50 @@ services:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
# Worker not needed with LocalExecutor
# airflow-worker:
# <<: *airflow-common
# command: celery worker --concurrency 2
# healthcheck:
# # yamllint disable rule:line-length
# test:
# - "CMD-SHELL"
# - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
# interval: 30s
# timeout: 10s
# retries: 5
# start_period: 30s
# environment:
# <<: *airflow-common-env
# # Required to handle warm shutdown of the celery workers properly
# # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
# DUMB_INIT_SETSID: "0"
# restart: always
# depends_on:
# <<: *airflow-common-depends-on
# airflow-init:
# condition: service_completed_successfully

airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
Expand Down Expand Up @@ -233,7 +255,7 @@ services:
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
Expand Down Expand Up @@ -264,12 +286,13 @@ services:
profiles:
- flower
ports:
- 5555:5555
- "5555:5555"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
Expand Down
6 changes: 4 additions & 2 deletions internal/provider/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,10 @@ func resourcePoolRead(ctx context.Context, d *schema.ResourceData, m interface{}
if err := d.Set("open_slots", pool.OpenSlots); err != nil {
return diag.FromErr(err)
}
if err := d.Set("description", pool.Description); err != nil {
return diag.FromErr(err)
if pool.Description.IsSet() && pool.Description.Get() != nil {
if err := d.Set("description", *pool.Description.Get()); err != nil {
return diag.FromErr(err)
}
}
if err := d.Set("include_deferred", pool.IncludeDeferred); err != nil {
return diag.FromErr(err)
Expand Down
98 changes: 98 additions & 0 deletions internal/provider/resource_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func TestAccAirflowPool_basic(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "name", rName),
resource.TestCheckResourceAttr(resourceName, "slots", "2"),
resource.TestCheckResourceAttr(resourceName, "open_slots", "2"),
resource.TestCheckResourceAttr(resourceName, "occupied_slots", "0"),
resource.TestCheckResourceAttr(resourceName, "queued_slots", "0"),
resource.TestCheckResourceAttr(resourceName, "running_slots", "0"),
resource.TestCheckResourceAttr(resourceName, "deferred_slots", "0"),
resource.TestCheckResourceAttr(resourceName, "scheduled_slots", "0"),
resource.TestCheckResourceAttr(resourceName, "include_deferred", "false"),
),
},
{
Expand All @@ -37,6 +43,78 @@ func TestAccAirflowPool_basic(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "name", rName),
resource.TestCheckResourceAttr(resourceName, "slots", "3"),
resource.TestCheckResourceAttr(resourceName, "open_slots", "3"),
resource.TestCheckResourceAttr(resourceName, "occupied_slots", "0"),
resource.TestCheckResourceAttr(resourceName, "queued_slots", "0"),
resource.TestCheckResourceAttr(resourceName, "running_slots", "0"),
resource.TestCheckResourceAttr(resourceName, "deferred_slots", "0"),
resource.TestCheckResourceAttr(resourceName, "scheduled_slots", "0"),
resource.TestCheckResourceAttr(resourceName, "include_deferred", "false"),
),
},
},
})
}

func TestAccAirflowPool_description(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")

resourceName := "airflow_pool.test"
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAirflowPoolCheckDestroy,
Steps: []resource.TestStep{
{
Config: testAccAirflowPoolConfigDescription(rName, 2, "Test description"),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(resourceName, "name", rName),
resource.TestCheckResourceAttr(resourceName, "slots", "2"),
resource.TestCheckResourceAttr(resourceName, "description", "Test description"),
),
},
{
Config: testAccAirflowPoolConfigDescription(rName, 2, "Updated description"),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(resourceName, "name", rName),
resource.TestCheckResourceAttr(resourceName, "slots", "2"),
resource.TestCheckResourceAttr(resourceName, "description", "Updated description"),
),
},
},
})
}

func TestAccAirflowPool_include_deferred(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")

resourceName := "airflow_pool.test"
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAirflowPoolCheckDestroy,
Steps: []resource.TestStep{
{
Config: testAccAirflowPoolConfigIncludeDeferred(rName, 2, true),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(resourceName, "name", rName),
resource.TestCheckResourceAttr(resourceName, "slots", "2"),
resource.TestCheckResourceAttr(resourceName, "include_deferred", "true"),
),
},
{
Config: testAccAirflowPoolConfigIncludeDeferred(rName, 2, false),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(resourceName, "name", rName),
resource.TestCheckResourceAttr(resourceName, "slots", "2"),
resource.TestCheckResourceAttr(resourceName, "include_deferred", "false"),
),
},
{
Config: testAccAirflowPoolConfigIncludeDeferred(rName, 2, true),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(resourceName, "name", rName),
resource.TestCheckResourceAttr(resourceName, "slots", "2"),
resource.TestCheckResourceAttr(resourceName, "include_deferred", "true"),
),
},
},
Expand Down Expand Up @@ -74,3 +152,23 @@ resource "airflow_pool" "test" {
}
`, rName, slots)
}

func testAccAirflowPoolConfigDescription(rName string, slots int, description string) string {
return fmt.Sprintf(`
resource "airflow_pool" "test" {
name = %[1]q
slots = %[2]d
description = %[3]q
}
`, rName, slots, description)
}

func testAccAirflowPoolConfigIncludeDeferred(rName string, slots int, includeDeferred bool) string {
return fmt.Sprintf(`
resource "airflow_pool" "test" {
name = %[1]q
slots = %[2]d
include_deferred = %[3]t
}
`, rName, slots, includeDeferred)
}