diff --git a/docker-compose.yaml b/docker-compose.yaml index 95b44f1..47efba7 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -24,7 +24,7 @@ # The following variables are supported: # # AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. -# Default: apache/airflow:2.5.1 +# Default: apache/airflow:|version| # AIRFLOW_UID - User ID in Airflow containers # Default: 50000 # AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed. @@ -36,21 +36,25 @@ # _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). # Default: airflow # _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. +# Use this option ONLY for quick checks. Installing requirements at container +# startup is done EVERY TIME the service is started. +# A better way is to build a custom image or extend the official image +# as described in https://airflow.apache.org/docs/docker-stack/build.html. # Default: '' # # Feel free to modify this file to suit your needs. --- -version: '3' +version: '3.8' x-airflow-common: &airflow-common # In order to add custom dependencies or upgrade provider packages you can use your extended image. # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml # and uncomment the "build" line below, Then run `docker-compose build` to build the images. - image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.1} + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.7.3} # build: . environment: &airflow-common-env - AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow # For backward compatibility, with Airflow <2.3 AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow @@ -60,10 +64,20 @@ x-airflow-common: AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'true' AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' + # yamllint disable rule:line-length + # Use simple http server on scheduler for health checks + # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server + # yamllint enable rule:line-length + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' + # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks + # for other purpose (development, test and especially production usage) build/extend Airflow image. _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} + AIRFLOW__WEBSERVER__WORKERS: 2 + AIRFLOW__WEBSERVER__WORKER_REFRESH_INTERVAL: 1800 volumes: - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs + - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins user: "${AIRFLOW_UID:-50000}:0" depends_on: @@ -75,7 +89,7 @@ x-airflow-common: services: postgres: - image: postgres:13 + image: postgres:17 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow @@ -84,8 +98,9 @@ services: - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] - interval: 5s + interval: 10s retries: 5 + start_period: 5s restart: always redis: @@ -94,21 +109,23 @@ services: - 6379 healthcheck: test: ["CMD", "redis-cli", "ping"] - interval: 5s + interval: 10s timeout: 30s retries: 50 + start_period: 30s restart: always airflow-webserver: <<: *airflow-common command: webserver ports: - - 8080:8080 + - "8080:8080" healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] - interval: 10s + interval: 30s timeout: 10s retries: 5 + start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on @@ -119,45 +136,50 @@ services: <<: *airflow-common command: scheduler healthcheck: - test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"'] - interval: 10s + test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] + interval: 30s timeout: 10s retries: 5 + start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully - airflow-worker: - <<: *airflow-common - command: celery worker - healthcheck: - test: - - "CMD-SHELL" - - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' - interval: 10s - timeout: 10s - retries: 5 - environment: - <<: *airflow-common-env - # Required to handle warm shutdown of the celery workers properly - # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation - DUMB_INIT_SETSID: "0" - restart: always - depends_on: - <<: *airflow-common-depends-on - airflow-init: - condition: service_completed_successfully + # Worker not needed with LocalExecutor + # airflow-worker: + # <<: *airflow-common + # command: celery worker --concurrency 2 + # healthcheck: + # # yamllint disable rule:line-length + # test: + # - "CMD-SHELL" + # - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + # interval: 30s + # timeout: 10s + # retries: 5 + # start_period: 30s + # environment: + # <<: *airflow-common-env + # # Required to handle warm shutdown of the celery workers properly + # # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + # DUMB_INIT_SETSID: "0" + # restart: always + # depends_on: + # <<: *airflow-common-depends-on + # airflow-init: + # condition: service_completed_successfully airflow-triggerer: <<: *airflow-common command: triggerer healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] - interval: 10s + interval: 30s timeout: 10s retries: 5 + start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on @@ -233,7 +255,7 @@ services: # yamllint enable rule:line-length environment: <<: *airflow-common-env - _AIRFLOW_DB_UPGRADE: 'true' + _AIRFLOW_DB_MIGRATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} @@ -264,12 +286,13 @@ services: profiles: - flower ports: - - 5555:5555 + - "5555:5555" healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:5555/"] - interval: 10s + interval: 30s timeout: 10s retries: 5 + start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on diff --git a/internal/provider/resource_pool.go b/internal/provider/resource_pool.go index 56a25a7..5848680 100644 --- a/internal/provider/resource_pool.go +++ b/internal/provider/resource_pool.go @@ -120,8 +120,10 @@ func resourcePoolRead(ctx context.Context, d *schema.ResourceData, m interface{} if err := d.Set("open_slots", pool.OpenSlots); err != nil { return diag.FromErr(err) } - if err := d.Set("description", pool.Description); err != nil { - return diag.FromErr(err) + if pool.Description.IsSet() && pool.Description.Get() != nil { + if err := d.Set("description", *pool.Description.Get()); err != nil { + return diag.FromErr(err) + } } if err := d.Set("include_deferred", pool.IncludeDeferred); err != nil { return diag.FromErr(err) diff --git a/internal/provider/resource_pool_test.go b/internal/provider/resource_pool_test.go index f04ef94..cdcb31a 100644 --- a/internal/provider/resource_pool_test.go +++ b/internal/provider/resource_pool_test.go @@ -24,6 +24,12 @@ func TestAccAirflowPool_basic(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "name", rName), resource.TestCheckResourceAttr(resourceName, "slots", "2"), resource.TestCheckResourceAttr(resourceName, "open_slots", "2"), + resource.TestCheckResourceAttr(resourceName, "occupied_slots", "0"), + resource.TestCheckResourceAttr(resourceName, "queued_slots", "0"), + resource.TestCheckResourceAttr(resourceName, "running_slots", "0"), + resource.TestCheckResourceAttr(resourceName, "deferred_slots", "0"), + resource.TestCheckResourceAttr(resourceName, "scheduled_slots", "0"), + resource.TestCheckResourceAttr(resourceName, "include_deferred", "false"), ), }, { @@ -37,6 +43,78 @@ func TestAccAirflowPool_basic(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "name", rName), resource.TestCheckResourceAttr(resourceName, "slots", "3"), resource.TestCheckResourceAttr(resourceName, "open_slots", "3"), + resource.TestCheckResourceAttr(resourceName, "occupied_slots", "0"), + resource.TestCheckResourceAttr(resourceName, "queued_slots", "0"), + resource.TestCheckResourceAttr(resourceName, "running_slots", "0"), + resource.TestCheckResourceAttr(resourceName, "deferred_slots", "0"), + resource.TestCheckResourceAttr(resourceName, "scheduled_slots", "0"), + resource.TestCheckResourceAttr(resourceName, "include_deferred", "false"), + ), + }, + }, + }) +} + +func TestAccAirflowPool_description(t *testing.T) { + rName := acctest.RandomWithPrefix("tf-acc-test") + + resourceName := "airflow_pool.test" + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckAirflowPoolCheckDestroy, + Steps: []resource.TestStep{ + { + Config: testAccAirflowPoolConfigDescription(rName, 2, "Test description"), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "slots", "2"), + resource.TestCheckResourceAttr(resourceName, "description", "Test description"), + ), + }, + { + Config: testAccAirflowPoolConfigDescription(rName, 2, "Updated description"), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "slots", "2"), + resource.TestCheckResourceAttr(resourceName, "description", "Updated description"), + ), + }, + }, + }) +} + +func TestAccAirflowPool_include_deferred(t *testing.T) { + rName := acctest.RandomWithPrefix("tf-acc-test") + + resourceName := "airflow_pool.test" + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckAirflowPoolCheckDestroy, + Steps: []resource.TestStep{ + { + Config: testAccAirflowPoolConfigIncludeDeferred(rName, 2, true), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "slots", "2"), + resource.TestCheckResourceAttr(resourceName, "include_deferred", "true"), + ), + }, + { + Config: testAccAirflowPoolConfigIncludeDeferred(rName, 2, false), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "slots", "2"), + resource.TestCheckResourceAttr(resourceName, "include_deferred", "false"), + ), + }, + { + Config: testAccAirflowPoolConfigIncludeDeferred(rName, 2, true), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "slots", "2"), + resource.TestCheckResourceAttr(resourceName, "include_deferred", "true"), ), }, }, @@ -74,3 +152,23 @@ resource "airflow_pool" "test" { } `, rName, slots) } + +func testAccAirflowPoolConfigDescription(rName string, slots int, description string) string { + return fmt.Sprintf(` +resource "airflow_pool" "test" { + name = %[1]q + slots = %[2]d + description = %[3]q +} +`, rName, slots, description) +} + +func testAccAirflowPoolConfigIncludeDeferred(rName string, slots int, includeDeferred bool) string { + return fmt.Sprintf(` +resource "airflow_pool" "test" { + name = %[1]q + slots = %[2]d + include_deferred = %[3]t +} +`, rName, slots, includeDeferred) +}