diff --git a/.github/ISSUE_TEMPLATE/01-bug.yml b/.github/ISSUE_TEMPLATE/01-bug.yml index ad663a79e0..2765661b89 100644 --- a/.github/ISSUE_TEMPLATE/01-bug.yml +++ b/.github/ISSUE_TEMPLATE/01-bug.yml @@ -52,9 +52,12 @@ body: label: ExecutionMode description: Which ExecutionMode are you using? options: + - "AIRFLOW_ASYNC" + - "AWS_ECS" - "AWS_EKS" - "AZURE_CONTAINER_INSTANCE" - "DOCKER" + - "GCP_CLOUD_RUN_JOB" - "KUBERNETES" - "LOCAL" - "VIRTUALENV" diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 4ce9477265..86fb351dce 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -7,7 +7,7 @@ on: jobs: pages: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest environment: name: github-pages url: ${{ steps.deployment.outputs.page_url }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 475090f5cb..d14ae5de72 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,7 +32,7 @@ jobs: architecture: "x64" - run: pip3 install hatch - - run: hatch run tests.py3.9-2.8:type-check + - run: hatch run tests.py3.9-2.8-1.9:type-check Run-Unit-Tests: runs-on: ubuntu-latest @@ -40,6 +40,7 @@ jobs: matrix: python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] + dbt-version: ["1.9"] exclude: - python-version: "3.11" airflow-version: "2.4" @@ -75,7 +76,7 @@ jobs: path: | ~/.cache/pip .local/share/hatch/ - key: unit-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + key: unit-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -86,16 +87,16 @@ jobs: run: | python -m pip install uv uv pip install --system hatch - hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze - - name: Test Cosmos against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }} + - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-cov + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-cov - name: Upload coverage to Github uses: actions/upload-artifact@v4 with: - name: coverage-unit-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} + name: coverage-unit-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} path: .coverage include-hidden-files: true @@ -106,6 +107,7 @@ jobs: matrix: python-version: ["3.8", "3.9", "3.10", "3.11"] airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] + dbt-version: ["1.9"] exclude: - python-version: "3.11" airflow-version: "2.4" @@ -139,7 +141,7 @@ jobs: path: | ~/.cache/pip .local/share/hatch/ - key: integration-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + key: integration-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}--${{ matrix.dbt-version }}${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -150,12 +152,12 @@ jobs: run: | python -m pip install uv uv pip install --system hatch - hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze - - name: Test Cosmos against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }} + - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-setup + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration env: AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0 AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ @@ -182,7 +184,7 @@ jobs: - name: Upload coverage to Github uses: actions/upload-artifact@v4 with: - name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} + name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} path: .coverage include-hidden-files: true @@ -193,6 +195,7 @@ jobs: matrix: python-version: ["3.11"] airflow-version: ["2.6"] + dbt-version: ["1.9"] services: postgres: @@ -216,7 +219,7 @@ jobs: path: | ~/.cache/pip .local/share/hatch/ - key: integration-expensive-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + key: integration-expensive-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -227,12 +230,12 @@ jobs: run: | python -m pip install uv uv pip install --system hatch - hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze - - name: Test Cosmos against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }} + - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup - DATABRICKS_UNIQUE_ID="${{github.run_id}}" hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-expensive + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-setup + DATABRICKS_UNIQUE_ID="${{github.run_id}}" hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-expensive env: AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres @@ -256,7 +259,7 @@ jobs: - name: Upload coverage to Github uses: actions/upload-artifact@v4 with: - name: coverage-integration-expensive-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} + name: coverage-integration-expensive-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} path: .coverage include-hidden-files: true @@ -274,6 +277,7 @@ jobs: matrix: python-version: [ "3.11" ] airflow-version: [ "2.8" ] + dbt-version: [ "1.5" ] services: postgres: image: postgres @@ -296,7 +300,7 @@ jobs: path: | ~/.cache/pip .local/share/hatch/ - key: integration-dbt-1-5-4-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + key: integration-dbt-1-5-4-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -307,11 +311,11 @@ jobs: run: | python -m pip install uv uv pip install --system hatch - hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt 1.5.4 run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-dbt-1-5-4 + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-dbt-1-5-4 env: AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres @@ -335,10 +339,68 @@ jobs: AIRFLOW__COSMOS__REMOTE_TARGET_PATH: "s3://cosmos-remote-cache/target_compiled/" AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID: aws_s3_conn + Run-Integration-Tests-DBT-Async: + needs: Authorize + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: [ "3.11" ] + airflow-version: [ "2.10" ] + dbt-version: ["1.5", "1.6", "1.7", "1.8", "1.9"] + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha || github.ref }} + - uses: actions/cache@v4 + with: + path: | + ~/.cache/pip + .local/share/hatch/ + key: integration-dbt-async-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install packages and dependencies + run: | + python -m pip install uv + uv pip install --system hatch + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze + + - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} + run: | + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-dbt-async + env: + AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ + AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }} + AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }} + AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 + PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH + AIRFLOW__COSMOS__ENABLE_CACHE: 0 + AIRFLOW__COSMOS__REMOTE_TARGET_PATH: "s3://cosmos-remote-cache/target_compiled/" + AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID: aws_s3_conn + DBT_ADAPTER_VERSION: ${{ matrix.dbt-version }} + - name: Upload coverage to Github uses: actions/upload-artifact@v4 with: - name: coverage-integration-dbt-1-5-4-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} + name: coverage-integration-dbt-async-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} path: .coverage include-hidden-files: true @@ -354,6 +416,7 @@ jobs: matrix: python-version: ["3.11"] airflow-version: ["2.8"] + dbt-version: ["1.9"] num-models: [1, 10, 50, 100, 500] services: postgres: @@ -376,7 +439,7 @@ jobs: path: | ~/.cache/pip .local/share/hatch/ - key: perf-test-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + key: perf-test-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -387,13 +450,13 @@ jobs: run: | python -m pip install uv uv pip install --system hatch - hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze - - name: Run performance tests against against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }} + - name: Run performance tests against against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} id: run-performance-tests run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-performance-setup - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-performance + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-performance-setup + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-performance # read the performance results and set them as an env var for the next step # format: NUM_MODELS={num_models}\nTIME={end - start}\n @@ -423,6 +486,7 @@ jobs: matrix: python-version: [ "3.12" ] airflow-version: [ "2.10" ] + dbt-version: [ "1.9" ] steps: - uses: actions/checkout@v4 with: @@ -432,7 +496,7 @@ jobs: path: | ~/.cache/pip .local/share/hatch/ - key: coverage-integration-kubernetes-test-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + key: coverage-integration-kubernetes-test-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 @@ -446,12 +510,12 @@ jobs: run: | python -m pip install uv uv pip install --system hatch - hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze - name: Run kubernetes tests run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-kubernetes-setup - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-kubernetes + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-kubernetes-setup + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-kubernetes env: AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres @@ -477,7 +541,7 @@ jobs: - name: Upload coverage to Github uses: actions/upload-artifact@v4 with: - name: coverage-integration-kubernetes-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} + name: coverage-integration-kubernetes-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} path: .coverage include-hidden-files: true diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 745cfda82e..2371a05dcd 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -57,7 +57,7 @@ repos: - --py37-plus - --keep-runtime-typing - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.9.6 + rev: v0.9.10 hooks: - id: ruff args: diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 8044eade4a..7e18695ce8 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,44 @@ Changelog ========= -1.9.0a5 (2025-02-03) +1.9.1 (2025-03-13) +-------------------- + +Bug Fixes + +* Fix import error in dbt bigquery adapter mock for ``dbt-bigquery<1.8`` for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1548 +* Fix ``operator_args`` override configuration by @ghjklw in #1558 +* Fix missing ``install_dbt_deps`` in ``ProjectConfig`` ``__init__`` method by @ghjklw in #1556 +* Fix dbt project parsing ``dbt_vars`` behavior passed via ``operator_args`` by @AlexandrKhabarov in #1543 +* Avoid reading the connection during DAG parsing of the async BigQuery operator by @joppevos in #1582 +* Fix: Workaround to incorrectly raised ``gcsfs.retry.HttpError`` (Invalid Credentials, 401) by @tatiana in #1598 +* Fix the async execution mode read sql files for dbt packages by @pankajastro in #1588 +* Improve BQ async error handling by @tatiana in #1597 +* Fix path selector when ``manifest.json`` is created using MS Windows by @tatiana in #1601 +* Fix log that prints 'Total filtered nodes' by @tatiana in #1603 +* Fix select behaviour using ``LoadMode.MANIFEST`` and a path with star by @tatiana in #1602 +* Support ``on_warning_callback`` with ``TestBehavior.BUILD`` and ``ExecutionMode.LOCAL`` by @corsettigyg in #1571 +* Fix ``DbtRunLocalOperator.partial()`` support by @tatiana @ashb in #1609 +* fix: ``container_name`` is null for ecs integration by @nicor88 in #1592 + +Docs + +* Improve MWAA getting-started docs by removing unused imports by @jx2lee in #1562 + +Others + +* Disable ``example_cosmos_dbt_build.py`` DAG in CI by @pankajastro in #1567 +* Upgrade GitHub Actions Ubuntu version by @tatiana in #1561 +* Update GitHub bug issue template by @pankajastro in #1586 +* Enable DAG ``example_cosmos_dbt_build.py`` in CI by @pankajastro in #1573 +* Run async DAG in DAG without setup/teardown task by @pankajastro in #1599 +* Add test case that fully covers recent select issue by @tatiana in #1604 +* Add CI job to test multiple dbt versions for the async DAG by @pankajkoti in #1535 +* Improve unit tests speed from 89s to 14s by @tatiana in #1600 +* Pre-commit updates: #1560, #1583, #1596 + + +1.9.0 (2025-02-19) -------------------- Breaking changes @@ -19,23 +56,43 @@ Features * Add structure to support multiple db for async operator execution by @pankajastro in #1483 * Support overriding the ``profile_config`` per dbt node or folder using config by @tatiana in #1492. More information `here `_. * Create and run accurate SQL statements when using ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti, @tatiana and @pankajastro in #1474 +* Add AWS ECS task run execution mode by @CarlosGitto and @aoelvp94 in #1507 +* Add support for running ``DbtSourceOperator`` individually by @victormacaubas in #1510 +* Add setup task for async executions by @pankajastro in #1518 +* Add teardown task for async executions by @pankajastro in #1529 +* Add ``ProjectConfig.install_dbt_deps`` & change operator ``install_deps=True`` as default by @tatiana in #1521 +* Extend Virtualenv operator and mock dbt adapters for setup & teardown tasks in ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti, @tatiana and @pankajastro in #1544 Bug Fixes * Fix select complex intersection of three tag-based graph selectors by @tatiana in #1466 +* Fix custom selector behaviour when the model name contains periods by @yakovlevvs and @60098727 in #1499 +* Filter dbt and non-dbt kwargs correctly for async operator by @pankajastro in #1526 Enhancement * Fix OpenLineage deprecation warning by @CorsettiS in #1449 * Move ``DbtRunner`` related functions into ``dbt/runner.py`` module by @tatiana in #1480 * Add ``on_warning_callback`` to ``DbtSourceKubernetesOperator`` and refactor previous operators by @LuigiCerone in #1501 +* Gracefully error when users set incompatible ``RenderConfig.dbt_deps`` and ``operator_args`` ``install_deps`` by @tatiana in #1505 +* Store compiled SQL as template field for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1534 + +Docs +* Improve ``RenderConfig`` arguments documentation by @tatiana in #1514 +* Improve callback documentation by @tatiana in #1516 +* Improve partial parsing docs by @tatiana in #1520 +* Fix typo in selecting & excluding docs by @pankajastro in #1523 +* Document ``async_py_requirements`` added in ``ExecutionConfig`` for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1545 Others * Ignore dbt package tests when running Cosmos tests by @tatiana in #1502 +* Refactor to consolidate async dbt adapter code by @pankajkoti in #1509 +* Log elapsed time for sql file(s) upload/download by @pankajastro in #1536 +* Remove the fallback operator for async task by @pankajastro in #1538 * GitHub Actions Dependabot: #1487 -* Pre-commit updates: #1473, #1493 +* Pre-commit updates: #1473, #1493, #1503, #1531 1.8.2 (2025-01-15) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 9410bffba3..df6d93258d 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.9.0a6" +__version__ = "1.9.1" from cosmos.airflow.dag import DbtDag diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index f96d3ba2a7..209a6d9614 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -267,9 +267,11 @@ def create_task_metadata( extra_context: dict[str, Any] = { "dbt_node_config": node.context_dict, "dbt_dag_task_group_identifier": dbt_dag_task_group_identifier, + "package_name": node.package_name, } if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: + args["on_warning_callback"] = on_warning_callback exclude_detached_tests_if_needed(node, args, detached_from_parent) task_id, args = _get_task_id_and_args( node, args, use_task_group, normalize_task_id, "build", include_resource_type=True diff --git a/cosmos/config.py b/cosmos/config.py index 0fe17ce6f3..bd302b2853 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -152,6 +152,7 @@ class ProjectConfig: :param snapshots_relative_path: The relative path to the dbt snapshots directory within the project. Defaults to snapshots :param manifest_path: The absolute path to the dbt manifest file. Defaults to None + :param manifest_conn_id: Name of the Airflow connection used to access the manifest file if it is not stored locally. Defaults to None :param project_name: Allows the user to define the project name. Required if dbt_project_path is not defined. Defaults to the folder name of dbt_project_path. :param env_vars: Dictionary of environment variables that are used for both rendering and execution. Rendering with @@ -175,6 +176,7 @@ class ProjectConfig: def __init__( self, dbt_project_path: str | Path | None = None, + install_dbt_deps: bool = True, models_relative_path: str | Path = "models", seeds_relative_path: str | Path = "seeds", snapshots_relative_path: str | Path = "snapshots", @@ -228,6 +230,7 @@ def __init__( self.env_vars = env_vars self.dbt_vars = dbt_vars self.partial_parse = partial_parse + self.install_dbt_deps = install_dbt_deps def validate_project(self) -> None: """ diff --git a/cosmos/converter.py b/cosmos/converter.py index 7c917a0225..b0716a01bb 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -223,7 +223,7 @@ def override_configuration( if execution_config.invocation_mode: operator_args["invocation_mode"] = execution_config.invocation_mode - if execution_config in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV): + if execution_config.execution_mode in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV): if "install_deps" not in operator_args: operator_args["install_deps"] = project_config.install_dbt_deps diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index a9291445ac..9e744d5b30 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -46,6 +46,13 @@ logger = get_logger(__name__) +def _normalize_path(path: str) -> str: + """ + Converts a potentially Windows path string into a Posix-friendly path. + """ + return Path(path.replace("\\", "/")).as_posix() + + class CosmosLoadDbtException(Exception): """ Exception raised while trying to load a `dbt` project as a `DbtGraph` instance. @@ -64,6 +71,7 @@ class DbtNode: resource_type: DbtResourceType depends_on: list[str] file_path: Path + package_name: str | None = None tags: list[str] = field(default_factory=lambda: []) config: dict[str, Any] = field(default_factory=lambda: {}) has_freshness: bool = False @@ -279,12 +287,17 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str, except json.decoder.JSONDecodeError: logger.debug("Skipped dbt ls line: %s", line) else: + base_path = ( + project_path.parent / node_dict["package_name"] if node_dict.get("package_name") else project_path # type: ignore + ) + try: node = DbtNode( unique_id=node_dict["unique_id"], + package_name=node_dict.get("package_name"), resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=project_path / node_dict["original_file_path"], + file_path=base_path / node_dict["original_file_path"], tags=node_dict.get("tags", []), config=node_dict.get("config", {}), has_freshness=( @@ -376,8 +389,8 @@ def _add_vars_arg(self, cmd_args: list[str]) -> None: """ Change args list in-place so they include dbt vars, if they are set. """ - if self.project.dbt_vars: - cmd_args.extend(["--vars", json.dumps(self.project.dbt_vars, sort_keys=True)]) + if self.dbt_vars: + cmd_args.extend(["--vars", json.dumps(self.dbt_vars, sort_keys=True)]) @cached_property def dbt_ls_args(self) -> list[str]: @@ -530,7 +543,7 @@ def load( self.update_node_dependency() logger.info("Total nodes: %i", len(self.nodes)) - logger.info("Total filtered nodes: %i", len(self.nodes)) + logger.info("Total filtered nodes: %i", len(self.filtered_nodes)) def run_dbt_ls( self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str] @@ -821,9 +834,10 @@ def load_from_dbt_manifest(self) -> None: for unique_id, node_dict in resources.items(): node = DbtNode( unique_id=unique_id, + package_name=node_dict.get("package_name"), resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=self.execution_config.project_path / Path(node_dict["original_file_path"]), + file_path=self.execution_config.project_path / _normalize_path(node_dict["original_file_path"]), tags=node_dict["tags"], config=node_dict["config"], has_freshness=( diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 0ac91e9f11..e410e21a7c 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -173,7 +173,7 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: # Index nodes by name, we can improve performance by doing this once # for multiple GraphSelectors if PATH_SELECTOR in self.node_name: - path_selection = self.node_name[len(PATH_SELECTOR) :] + path_selection = self.node_name[len(PATH_SELECTOR) :].rstrip("*") root_nodes.update({node_id for node_id, node in nodes.items() if path_selection in str(node.file_path)}) elif TAG_SELECTOR in self.node_name: @@ -366,7 +366,7 @@ def _parse_tag_selector(self, item: str) -> None: def _parse_path_selector(self, item: str) -> None: index = len(PATH_SELECTOR) if self.project_dir: - self.paths.append(self.project_dir / Path(item[index:])) + self.paths.append(self.project_dir / Path(item[index:].rstrip("*"))) else: self.paths.append(Path(item[index:])) diff --git a/cosmos/io.py b/cosmos/io.py index 0cce873e59..f065d8bf13 100644 --- a/cosmos/io.py +++ b/cosmos/io.py @@ -8,7 +8,6 @@ from cosmos import settings from cosmos.constants import DEFAULT_TARGET_PATH, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP from cosmos.exceptions import CosmosValueError -from cosmos.settings import remote_target_path, remote_target_path_conn_id def upload_to_aws_s3( @@ -136,14 +135,14 @@ def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]: """Configure the remote target path if it is provided.""" from airflow.version import version as airflow_version - if not remote_target_path: + if not settings.remote_target_path: return None, None _configured_target_path = None - target_path_str = str(remote_target_path) + target_path_str = str(settings.remote_target_path) - remote_conn_id = remote_target_path_conn_id + remote_conn_id = settings.remote_target_path_conn_id if not remote_conn_id: target_path_schema = urlparse(target_path_str).scheme remote_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(target_path_schema, None) # type: ignore[assignment] diff --git a/cosmos/operators/_asynchronous/bigquery.py b/cosmos/operators/_asynchronous/bigquery.py index 72e2856d96..60c2ac6c0a 100644 --- a/cosmos/operators/_asynchronous/bigquery.py +++ b/cosmos/operators/_asynchronous/bigquery.py @@ -5,7 +5,15 @@ from typing import TYPE_CHECKING, Any, Sequence import airflow -from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator + +try: + from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +except ImportError: + raise ImportError( + "Could not import BigQueryInsertJobOperator. Ensure you've installed the Google Cloud provider separately or " + "with with `pip install apache-airflow-providers-google`." + ) + from airflow.utils.context import Context from airflow.utils.session import NEW_SESSION, provide_session from packaging.version import Version @@ -15,7 +23,7 @@ from cosmos.dataset import get_dataset_alias_name from cosmos.exceptions import CosmosValueError from cosmos.operators.local import AbstractDbtLocalBase -from cosmos.settings import enable_setup_async_task, remote_target_path, remote_target_path_conn_id +from cosmos.settings import remote_target_path, remote_target_path_conn_id if TYPE_CHECKING: # pragma: no cover from sqlalchemy.orm import Session @@ -28,7 +36,11 @@ def _mock_bigquery_adapter() -> None: import agate from dbt.adapters.bigquery.connections import BigQueryAdapterResponse, BigQueryConnectionManager - from dbt_common.clients.agate_helper import empty_table + + try: + from dbt_common.clients.agate_helper import empty_table + except (ModuleNotFoundError, ImportError): # pragma: no cover + from dbt.clients.agate_helper import empty_table def execute( # type: ignore[no-untyped-def] self, sql, auto_begin=False, fetch=None, limit: Optional[int] = None @@ -69,9 +81,6 @@ def __init__( self.project_dir = project_dir self.profile_config = profile_config self.gcp_conn_id = self.profile_config.profile_mapping.conn_id # type: ignore - profile = self.profile_config.profile_mapping.profile # type: ignore - self.gcp_project = profile["project"] - self.dataset = profile["dataset"] self.extra_context = extra_context or {} self.configuration: dict[str, Any] = {} self.dbt_kwargs = dbt_kwargs or {} @@ -99,6 +108,8 @@ def __init__( self.async_context["profile_type"] = self.profile_config.get_profile_type() self.async_context["async_operator"] = BigQueryInsertJobOperator self.compiled_sql = "" + self.gcp_project = "" + self.dataset = "" @property def base_cmd(self) -> list[str]: @@ -131,7 +142,7 @@ def get_remote_sql(self) -> str: return sql # type: ignore def execute(self, context: Context, **kwargs: Any) -> None: - if enable_setup_async_task: + if settings.enable_setup_async_task: self.configuration = { "query": { "query": self.get_remote_sql(), @@ -141,20 +152,29 @@ def execute(self, context: Context, **kwargs: Any) -> None: super().execute(context=context) else: self.build_and_run_cmd(context=context, run_as_async=True, async_context=self.async_context) - self._store_compiled_sql(context=context) + self._store_template_fields(context=context) @provide_session - def _store_compiled_sql(self, context: Context, session: Session = NEW_SESSION) -> None: + def _store_template_fields(self, context: Context, session: Session = NEW_SESSION) -> None: from airflow.models.renderedtifields import RenderedTaskInstanceFields from airflow.models.taskinstance import TaskInstance - if not enable_setup_async_task: + if not settings.enable_setup_async_task: self.log.info("SQL cannot be made available, skipping registration of compiled_sql template field") return sql = self.get_remote_sql().strip() self.log.debug("Executed SQL is: %s", sql) self.compiled_sql = sql + if self.profile_config.profile_mapping is not None: + profile = self.profile_config.profile_mapping.profile + else: + raise CosmosValueError( + "The `profile_config.profile`_mapping attribute must be defined to use `ExecutionMode.AIRFLOW_ASYNC`" + ) + self.gcp_project = profile["project"] + self.dataset = profile["dataset"] + # need to refresh the rendered task field record in the db because Airflow only does this # before executing the task, not after ti = context["ti"] @@ -184,5 +204,5 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> Any: """ job_id = super().execute_complete(context=context, event=event) self.log.info("Configuration is %s", str(self.configuration)) - self._store_compiled_sql(context=context) + self._store_template_fields(context=context) return job_id diff --git a/cosmos/operators/aws_ecs.py b/cosmos/operators/aws_ecs.py index 2decbcc6c4..98061269ff 100644 --- a/cosmos/operators/aws_ecs.py +++ b/cosmos/operators/aws_ecs.py @@ -21,6 +21,8 @@ logger = get_logger(__name__) +DEFAULT_CONN_ID = "aws_default" +DEFAULT_CONTAINER_NAME = "dbt" DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {} try: @@ -47,10 +49,10 @@ class DbtAwsEcsBaseOperator(AbstractDbtBase, EcsRunTaskOperator): # type: ignor def __init__( self, # arguments required by EcsRunTaskOperator - aws_conn_id: str, cluster: str, task_definition: str, - container_name: str, + container_name: str = DEFAULT_CONTAINER_NAME, + aws_conn_id: str = DEFAULT_CONN_ID, # profile_config: ProfileConfig | None = None, command: list[str] | None = None, @@ -66,6 +68,7 @@ def __init__( "aws_conn_id": aws_conn_id, "task_definition": task_definition, "cluster": cluster, + "container_name": container_name, "overrides": None, } ) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 18019ab92b..552172f746 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -1,5 +1,6 @@ from __future__ import annotations +import inspect import logging import os from abc import ABCMeta, abstractmethod @@ -142,6 +143,26 @@ def __init__( self.extra_context = extra_context or {} kwargs.pop("full_refresh", None) # usage of this param should be implemented in child classes + # The following is necessary so that dynamic mapped classes work since Cosmos 1.9.0 subclass changes + # Bug report: https://github.com/astronomer/astronomer-cosmos/issues/1546 + __init__._BaseOperatorMeta__param_names = { # type: ignore + name + for (name, param) in inspect.signature(__init__).parameters.items() + if param.name != "self" and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD) + } + + def __init_subclass__(cls) -> None: + super().__init_subclass__() + # The following is necessary so that dynamic mapped classes work since Cosmos 1.9.0 subclass changes + # Since this class is subclassed by all Cosmos operators, to do this here allows to avoid to have this + # logic explicitly in all subclasses + # Bug report: https://github.com/astronomer/astronomer-cosmos/issues/1546 + cls.__init__._BaseOperatorMeta__param_names = { # type: ignore + name + for (name, param) in inspect.signature(cls.__init__).parameters.items() + if param.name != "self" and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD) + } + def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]: """ Builds the set of environment variables to be exposed for the bash command. diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 21fa6ae915..7559552f30 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -37,8 +37,6 @@ from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file from cosmos.exceptions import AirflowCompatibilityError, CosmosDbtRunError, CosmosValueError from cosmos.settings import ( - enable_setup_async_task, - enable_teardown_async_task, remote_target_path, remote_target_path_conn_id, ) @@ -406,8 +404,10 @@ def _cache_package_lockfile(self, tmp_project_dir: Path) -> None: _copy_cached_package_lockfile_to_project(latest_package_lockfile, tmp_project_dir) def _read_run_sql_from_target_dir(self, tmp_project_dir: str, sql_context: dict[str, Any]) -> str: - sql_relative_path = sql_context["dbt_node_config"]["file_path"].split(str(self.project_dir))[-1].lstrip("/") - run_sql_path = Path(tmp_project_dir) / "target/run" / Path(self.project_dir).name / sql_relative_path + package_name = sql_context.get("package_name") or Path(self.project_dir).name + sql_relative_path = sql_context["dbt_node_config"]["file_path"].split(package_name)[-1].lstrip("/") + run_sql_path = Path(tmp_project_dir) / "target/run" / Path(package_name).name / sql_relative_path + with run_sql_path.open("r") as sql_file: sql_content: str = sql_file.read() return sql_content @@ -483,11 +483,11 @@ def _handle_post_execution(self, tmp_project_dir: str, context: Context) -> None self.callback(tmp_project_dir, **self.callback_args) def _handle_async_execution(self, tmp_project_dir: str, context: Context, async_context: dict[str, Any]) -> None: - if async_context.get("teardown_task") and enable_teardown_async_task: + if async_context.get("teardown_task") and settings.enable_teardown_async_task: self._delete_sql_files(Path(tmp_project_dir), "run") return - if enable_setup_async_task: + if settings.enable_setup_async_task: self._upload_sql_files(tmp_project_dir, "run") else: sql = self._read_run_sql_from_target_dir(tmp_project_dir, async_context) @@ -531,7 +531,7 @@ def run_command( if self.install_deps: self._install_dependencies(tmp_dir_path, flags, env) - if run_as_async and not enable_setup_async_task: + if run_as_async and not settings.enable_setup_async_task: self._mock_dbt_adapter(async_context) full_cmd = cmd + flags @@ -762,8 +762,36 @@ class DbtBuildLocalOperator(DbtBuildMixin, DbtLocalBaseOperator): template_fields: Sequence[str] = DbtLocalBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] - def __init__(self, *args: Any, **kwargs: Any) -> None: + def __init__(self, *args: Any, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + self.on_warning_callback = on_warning_callback + self.extract_issues: Callable[..., tuple[list[str], list[str]]] + + def _handle_warnings(self, result: FullOutputSubprocessResult | dbtRunnerResult, context: Context) -> None: + """ + Handles warnings by extracting log issues, creating additional context, and calling the + on_warning_callback with the updated context. + + :param result: The result object from the build and run command. + :param context: The original airflow context in which the build and run command was executed. + """ + if self.invocation_mode == InvocationMode.SUBPROCESS: + self.extract_issues = extract_freshness_warn_msg + elif self.invocation_mode == InvocationMode.DBT_RUNNER: + self.extract_issues = dbt_runner.extract_message_by_status + + test_names, test_results = self.extract_issues(result) + + warning_context = dict(context) + warning_context["test_names"] = test_names + warning_context["test_results"] = test_results + + self.on_warning_callback and self.on_warning_callback(warning_context) + + def execute(self, context: Context, **kwargs: Any) -> None: + result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) + if self.on_warning_callback: + self._handle_warnings(result, context) class DbtLSLocalOperator(DbtLSMixin, DbtLocalBaseOperator): diff --git a/dev/dags/dbt/simple/dbt_project.yml b/dev/dags/dbt/simple/dbt_project.yml new file mode 100644 index 0000000000..4fa380408b --- /dev/null +++ b/dev/dags/dbt/simple/dbt_project.yml @@ -0,0 +1,8 @@ +name: 'my_dbt_project' +version: '1.0.0' +profile: 'default' + +models: + my_dbt_project: + example: + materialized: table diff --git a/dev/dags/dbt/simple/models/example_model.sql b/dev/dags/dbt/simple/models/example_model.sql new file mode 100644 index 0000000000..583e918924 --- /dev/null +++ b/dev/dags/dbt/simple/models/example_model.sql @@ -0,0 +1 @@ +SELECT 1 AS id, 'example' AS name diff --git a/dev/dags/dbt/simple/profiles.yml b/dev/dags/dbt/simple/profiles.yml new file mode 100644 index 0000000000..224f565f4a --- /dev/null +++ b/dev/dags/dbt/simple/profiles.yml @@ -0,0 +1,12 @@ +default: + target: dev + outputs: + dev: + type: postgres + host: "{{ env_var('POSTGRES_HOST') }}" + user: "{{ env_var('POSTGRES_USER') }}" + password: "{{ env_var('POSTGRES_PASSWORD') }}" + port: "{{ env_var('POSTGRES_PORT') | int }}" + dbname: "{{ env_var('POSTGRES_DB') }}" + schema: "{{ env_var('POSTGRES_SCHEMA') }}" + threads: 4 diff --git a/dev/dags/example_cosmos_dbt_build.py b/dev/dags/example_cosmos_dbt_build.py index 57ad8340f2..1016d62129 100644 --- a/dev/dags/example_cosmos_dbt_build.py +++ b/dev/dags/example_cosmos_dbt_build.py @@ -6,8 +6,7 @@ from datetime import datetime from pathlib import Path -from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import TestBehavior +from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig, TestBehavior from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" diff --git a/dev/dags/example_task_mapping.py b/dev/dags/example_task_mapping.py new file mode 100644 index 0000000000..15da345e7b --- /dev/null +++ b/dev/dags/example_task_mapping.py @@ -0,0 +1,39 @@ +import os +from datetime import datetime +from pathlib import Path + +from airflow import DAG + +from cosmos.config import ProfileConfig +from cosmos.operators.local import DbtRunLocalOperator +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) + + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + +# Define the DAG +with DAG( + dag_id="example_task_mapping", + start_date=datetime(2024, 1, 1), + schedule_interval=None, + catchup=False, +) as dag: + + dbt_partial = DbtRunLocalOperator.partial( + task_id="dbt_run", project_dir=DBT_ROOT_PATH / "simple", profile_config=profile_config, emit_datasets=False + ) + + dbt_run = dbt_partial.expand(select=["example_model"]) # Only run the specific model + + dbt_run diff --git a/dev/dags/simple_dag_async.py b/dev/dags/simple_dag_async.py index 23e9836cff..d9afba493d 100644 --- a/dev/dags/simple_dag_async.py +++ b/dev/dags/simple_dag_async.py @@ -3,11 +3,14 @@ from pathlib import Path from cosmos import DbtDag, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.constants import TestBehavior from cosmos.profiles import GoogleCloudServiceAccountDictProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) +DBT_ADAPTER_VERSION = os.getenv("DBT_ADAPTER_VERSION", "1.9") + profile_config = ProfileConfig( profile_name="default", target_name="dev", @@ -26,12 +29,9 @@ profile_config=profile_config, execution_config=ExecutionConfig( execution_mode=ExecutionMode.AIRFLOW_ASYNC, - async_py_requirements=["dbt-bigquery"], - ), - render_config=RenderConfig( - select=["path:models"], - # test_behavior=TestBehavior.NONE + async_py_requirements=[f"dbt-bigquery=={DBT_ADAPTER_VERSION}"], ), + render_config=RenderConfig(select=["path:models"], test_behavior=TestBehavior.NONE), # normal dag parameters schedule_interval=None, start_date=datetime(2023, 1, 1), diff --git a/docs/getting_started/mwaa.rst b/docs/getting_started/mwaa.rst index 8555cde344..7c4dc88ad2 100644 --- a/docs/getting_started/mwaa.rst +++ b/docs/getting_started/mwaa.rst @@ -91,7 +91,6 @@ In your ``my_cosmos_dag.py`` file, import the ``DbtDag`` class from Cosmos and c from datetime import datetime from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig from cosmos.profiles import PostgresUserPasswordProfileMapping - from cosmos.constants import ExecutionMode profile_config = ProfileConfig( profile_name="default", diff --git a/pyproject.toml b/pyproject.toml index 58262f1bd8..24a00c4962 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,7 +74,9 @@ openlineage = ["openlineage-integration-common!=1.15.0", "openlineage-airflow"] amazon = [ "apache-airflow-providers-amazon[s3fs]>=3.0.0", ] -google = ["apache-airflow-providers-google>=10.17.0"] + +# Due to issue https://github.com/fsspec/gcsfs/issues/664 +google = ["apache-airflow-providers-google>=10.17.0", "gcsfs<2025.3.0"] microsoft = ["apache-airflow-providers-microsoft-azure>=8.5.0"] all = [ "astronomer-cosmos[dbt-all]", @@ -153,12 +155,14 @@ dependencies = [ "types-python-dateutil", "Werkzeug<3.0.0", "dbt-core", + "methodtools", ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] [[tool.hatch.envs.tests.matrix]] python = ["3.8", "3.9", "3.10", "3.11", "3.12"] airflow = ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] +dbt = ["1.5", "1.6", "1.7", "1.8", "1.9"] [tool.hatch.envs.tests.overrides] matrix.airflow.dependencies = [ @@ -173,6 +177,7 @@ test-integration = 'sh scripts/test/integration.sh' test-kubernetes = "sh scripts/test/integration-kubernetes.sh" test-kubernetes-setup = "sh scripts/test/kubernetes-setup.sh" test-integration-dbt-1-5-4 = 'sh scripts/test/integration-dbt-1-5-4.sh' +test-integration-dbt-async = 'sh scripts/test/integration-dbt-async.sh {matrix:dbt}' test-integration-expensive = 'sh scripts/test/integration-expensive.sh' test-integration-setup = 'sh scripts/test/integration-setup.sh' test-performance = 'sh scripts/test/performance.sh' diff --git a/scripts/test/integration-dbt-async.sh b/scripts/test/integration-dbt-async.sh new file mode 100644 index 0000000000..b4ff9e9195 --- /dev/null +++ b/scripts/test/integration-dbt-async.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +set -x +set -e + +DBT_VERSION="$1" +echo "DBT_VERSION:" +echo "$DBT_VERSION" + + +pip uninstall dbt-adapters dbt-common dbt-core dbt-extractor dbt-postgres dbt-semantic-interfaces -y +pip install "dbt-postgres==$DBT_VERSION" "dbt-databricks==$DBT_VERSION" "dbt-bigquery==$DBT_VERSION" +export SOURCE_RENDERING_BEHAVIOR=all +rm -rf airflow.*; \ +airflow db init; \ + +if [ "$DBT_VERSION" = "1.7" ]; then + # Otherwise, we will get the following error: + # stderr: MessageToJson() got an unexpected keyword argument 'including_default_value_fields' + echo "DBT version is 1.7 — Installing protobuf==4.25.6..." + pip install protobuf==4.25.6 +fi + +rm -rf dbt/jaffle_shop/dbt_packages; +pytest -vv \ + --cov=cosmos \ + --cov-report=term-missing \ + --cov-report=xml \ + "tests/test_async_example_dag.py::test_example_dag[simple_dag_async]" diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index 985c99df57..bf19b4248f 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -19,5 +19,6 @@ pytest -vv \ --cov-report=xml \ -m 'integration' \ --ignore=tests/perf \ + --ignore=tests/test_async_example_dag.py \ --ignore=tests/test_example_k8s_dags.py \ -k 'not ( example_cosmos_python_models or example_virtualenv or jaffle_shop_kubernetes)' diff --git a/scripts/test/performance.sh b/scripts/test/performance.sh index 2023026d3b..97941e86b7 100644 --- a/scripts/test/performance.sh +++ b/scripts/test/performance.sh @@ -2,5 +2,6 @@ pytest -vv \ -s \ -m 'perf' \ --ignore=tests/test_example_dags.py \ + --ignore=tests/test_async_example_dag.py \ --ignore=tests/test_example_dags_no_connections.py \ --ignore=tests/test_example_k8s_dags.py diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index b9ee15eba9..edc87c9dc0 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -30,6 +30,9 @@ uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.t uv pip install apache-airflow-providers-docker --constraint /tmp/constraint.txt uv pip install apache-airflow-providers-postgres --constraint /tmp/constraint.txt +# Due to issue https://github.com/fsspec/gcsfs/issues/664 +uv pip install "gcsfs<2025.3.0" + if [ "$AIRFLOW_VERSION" = "2.4" ] || [ "$AIRFLOW_VERSION" = "2.5" ] || [ "$AIRFLOW_VERSION" = "2.6" ] ; then uv pip install "apache-airflow-providers-amazon" "apache-airflow==$AIRFLOW_VERSION" "urllib3<2" uv pip install "apache-airflow-providers-cncf-kubernetes" "apache-airflow==$AIRFLOW_VERSION" diff --git a/scripts/test/unit-cov.sh b/scripts/test/unit-cov.sh index 8d8fe3589a..50cd268c40 100644 --- a/scripts/test/unit-cov.sh +++ b/scripts/test/unit-cov.sh @@ -6,5 +6,6 @@ pytest \ -m "not (integration or perf)" \ --ignore=tests/perf \ --ignore=tests/test_example_dags.py \ + --ignore=tests/test_async_example_dag.py \ --ignore=tests/test_example_dags_no_connections.py \ --ignore=tests/test_example_k8s_dags.py diff --git a/scripts/test/unit.sh b/scripts/test/unit.sh index b80aab72ea..39069c423a 100644 --- a/scripts/test/unit.sh +++ b/scripts/test/unit.sh @@ -3,5 +3,6 @@ pytest \ -m "not (integration or perf)" \ --ignore=tests/perf \ --ignore=tests/test_example_dags.py \ + --ignore=tests/test_async_example_dag.py \ --ignore=tests/test_example_dags_no_connections.py \ --ignore=tests/test_example_k8s_dags.py diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index a3d9474dcd..728b86283b 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -435,6 +435,7 @@ def test_create_task_metadata_unsupported(caplog): "resource_name": "my_model", "name": "my_model", }, + "package_name": None, }, ), ( @@ -476,6 +477,7 @@ def test_create_task_metadata_unsupported(caplog): "resource_name": "my_snapshot", "name": "my_snapshot", }, + "package_name": None, }, ), ], diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 4e8b896519..5d79675646 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -30,6 +30,7 @@ DbtGraph, DbtNode, LoadMode, + _normalize_path, parse_dbt_ls_output, run_command, ) @@ -40,6 +41,7 @@ DBT_PROJECT_NAME = "jaffle_shop" ALTERED_DBT_PROJECT_NAME = "altered_jaffle_shop" SAMPLE_MANIFEST = Path(__file__).parent.parent / "sample/manifest.json" +SAMPLE_SMALL_MANIFEST = Path(__file__).parent.parent / "sample/small_manifest.json" SAMPLE_MANIFEST_PY = Path(__file__).parent.parent / "sample/manifest_python.json" SAMPLE_MANIFEST_MODEL_VERSION = Path(__file__).parent.parent / "sample/manifest_model_version.json" SAMPLE_MANIFEST_SOURCE = Path(__file__).parent.parent / "sample/manifest_source.json" @@ -262,6 +264,31 @@ def test_load_via_manifest_with_exclude(project_name, manifest_filepath, model_f assert sample_node.file_path == DBT_PROJECTS_ROOT_DIR / f"{project_name}/models/{model_filepath}" +def test_load_via_manifest_with_ms_windows_manifest_and_star_selector(): + # This test is based on a real user-case that in 1.9.0 and before would return an empty list of filtered nodes + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR, # this value is not used in DAG rendering when the manifest is given + manifest_path=SAMPLE_SMALL_MANIFEST, + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + render_config = RenderConfig(select=["path:models/edr*+"]) + execution_config = ExecutionConfig(dbt_project_path=project_config.dbt_project_path) + dbt_graph = DbtGraph( + project=project_config, + execution_config=execution_config, + profile_config=profile_config, + render_config=render_config, + ) + dbt_graph.load_from_dbt_manifest() + + assert len(dbt_graph.nodes) == 1 + assert len(dbt_graph.filtered_nodes) == 1 + + @pytest.mark.parametrize( "project_name,manifest_filepath,model_filepath", [(DBT_PROJECT_NAME, SAMPLE_MANIFEST, "customers.sql"), ("jaffle_shop_python", SAMPLE_MANIFEST_PY, "customers.py")], @@ -1349,7 +1376,7 @@ def test_parse_dbt_ls_output_real_life_customer_bug(caplog): "model.some_package.some_model": DbtNode( unique_id="model.some_package.some_model", resource_type=DbtResourceType.MODEL, - file_path=Path("fake-project/models/some_model.sql"), + file_path=Path("some_package/models/some_model.sql"), tags=[], config={ "access": "protected", @@ -1383,6 +1410,7 @@ def test_parse_dbt_ls_output_real_life_customer_bug(caplog): "unique_key": None, }, depends_on=["source.some_source"], + package_name="some_package", ), } nodes = parse_dbt_ls_output(Path("fake-project"), dbt_ls_output) @@ -1392,7 +1420,7 @@ def test_parse_dbt_ls_output_real_life_customer_bug(caplog): def test_parse_dbt_ls_output(): - fake_ls_stdout = '{"resource_type": "model", "name": "fake-name", "original_file_path": "fake-file-path.sql", "unique_id": "fake-unique-id", "tags": [], "config": {}}' + fake_ls_stdout = '{"resource_type": "model", "name": "fake-name", "package_name": "fake-project", "original_file_path": "fake-file-path.sql", "unique_id": "fake-unique-id", "tags": [], "config": {}}' expected_nodes = { "fake-unique-id": DbtNode( @@ -1402,6 +1430,7 @@ def test_parse_dbt_ls_output(): tags=[], config={}, depends_on=[], + package_name="fake-project", ), } nodes = parse_dbt_ls_output(Path("fake-project"), fake_ls_stdout) @@ -1410,7 +1439,7 @@ def test_parse_dbt_ls_output(): def test_parse_dbt_ls_output_with_json_without_tags_or_config(): - some_ls_stdout = '{"resource_type": "model", "name": "some-name", "original_file_path": "some-file-path.sql", "unique_id": "some-unique-id", "config": {}}' + some_ls_stdout = '{"resource_type": "model", "name": "some-name", "package_name": "some-project", "original_file_path": "some-file-path.sql", "unique_id": "some-unique-id", "config": {}}' expected_nodes = { "some-unique-id": DbtNode( @@ -1420,6 +1449,7 @@ def test_parse_dbt_ls_output_with_json_without_tags_or_config(): tags=[], config={}, depends_on=[], + package_name="some-project", ), } nodes = parse_dbt_ls_output(Path("some-project"), some_ls_stdout) @@ -1508,8 +1538,7 @@ def test_load_via_dbt_ls_project_config_dbt_vars( """Tests that the dbt ls command in the subprocess has "--vars" with the project config dbt_vars.""" mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 - dbt_vars = {"my_var1": "my_value1", "my_var2": "my_value2"} - project_config = ProjectConfig(dbt_vars=dbt_vars) + project_config = ProjectConfig(dbt_vars={"my_var1": "my_value1", "my_var2": "my_value2"}) render_config = RenderConfig( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, @@ -1528,8 +1557,42 @@ def test_load_via_dbt_ls_project_config_dbt_vars( ) dbt_graph.load_via_dbt_ls() ls_command = mock_popen.call_args.args[0] + assert "--vars" not in ls_command + + +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) +@patch("cosmos.dbt.graph.Popen") +@patch("cosmos.dbt.graph.DbtGraph.update_node_dependency") +@patch("cosmos.config.RenderConfig.validate_dbt_command") +@patch.dict(sys.modules, {"dbt.cli.main": None}) +def test_load_via_dbt_ls_dbt_graph_dbt_vars( + mock_validate, mock_update_nodes, mock_popen, mock_use_case, tmp_dbt_project_dir +): + """Tests that the dbt ls command in the subprocess has "--vars" with the DbtGraph dbt_vars.""" + mock_popen().communicate.return_value = ("", "") + mock_popen().returncode = 0 + dbt_vars = {"my_var3": "my_value3"} + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + dbt_graph = DbtGraph( + project=ProjectConfig(), + render_config=render_config, + execution_config=execution_config, + profile_config=profile_config, + dbt_vars=dbt_vars, + ) + dbt_graph.load_via_dbt_ls() + ls_command = mock_popen.call_args.args[0] assert "--vars" in ls_command - assert ls_command[ls_command.index("--vars") + 1] == '{"my_var1": "my_value1", "my_var2": "my_value2"}' + assert ls_command[ls_command.index("--vars") + 1] == json.dumps(dbt_vars, sort_keys=True) @patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) @@ -1721,21 +1784,23 @@ def test_project_path_fails(): @pytest.mark.parametrize( - "render_config,project_config,expected_dbt_ls_args", + "render_config,project_config,dbt_vars,expected_dbt_ls_args", [ - (RenderConfig(), ProjectConfig(), []), - (RenderConfig(exclude=["package:snowplow"]), ProjectConfig(), ["--exclude", "package:snowplow"]), + (RenderConfig(), ProjectConfig(), None, []), + (RenderConfig(exclude=["package:snowplow"]), ProjectConfig(), None, ["--exclude", "package:snowplow"]), ( RenderConfig(select=["tag:prod", "config.materialized:incremental"]), ProjectConfig(), + None, ["--select", "tag:prod", "config.materialized:incremental"], ), - (RenderConfig(selector="nightly"), ProjectConfig(), ["--selector", "nightly"]), - (RenderConfig(), ProjectConfig(dbt_vars={"a": 1}), ["--vars", '{"a": 1}']), - (RenderConfig(), ProjectConfig(partial_parse=False), ["--no-partial-parse"]), + (RenderConfig(selector="nightly"), ProjectConfig(), None, ["--selector", "nightly"]), + (RenderConfig(), ProjectConfig(dbt_vars={"a": 1}), {"k": "v"}, ["--vars", '{"k": "v"}']), + (RenderConfig(), ProjectConfig(partial_parse=False), None, ["--no-partial-parse"]), ( RenderConfig(exclude=["1", "2"], select=["a", "b"], selector="nightly"), ProjectConfig(dbt_vars={"a": 1}, partial_parse=False), + {"k": "v"}, [ "--exclude", "1", @@ -1744,7 +1809,7 @@ def test_project_path_fails(): "a", "b", "--vars", - '{"a": 1}', + '{"k": "v"}', "--selector", "nightly", "--no-partial-parse", @@ -1752,10 +1817,11 @@ def test_project_path_fails(): ), ], ) -def test_dbt_ls_args(render_config, project_config, expected_dbt_ls_args): +def test_dbt_ls_args(render_config, project_config, dbt_vars, expected_dbt_ls_args): graph = DbtGraph( project=project_config, render_config=render_config, + dbt_vars=dbt_vars, ) assert graph.dbt_ls_args == expected_dbt_ls_args @@ -1768,8 +1834,8 @@ def test_dbt_ls_cache_key_args_sorts_envvars(): @patch("cosmos.dbt.graph.run_command") def test_run_dbt_deps(run_command_mock): - project_config = ProjectConfig(dbt_vars={"var-key": "var-value"}) - graph = DbtGraph(project=project_config) + project_config = ProjectConfig() + graph = DbtGraph(project=project_config, dbt_vars={"var-key": "var-value"}) graph.local_flags = [] graph.run_dbt_deps("dbt", "/some/path", {}) run_command_mock.assert_called_with( @@ -1815,7 +1881,7 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir hash_dir, hash_args = version.split(",") assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" if sys.platform == "darwin": - assert hash_dir == "afbced719302d5b1efdfb191c617e349" + assert hash_dir == "391db5c7e1fb90214d829dd0476059a1" else: assert hash_dir == "0148da6f5f7fd260c9fa55c3b3c45168" @@ -1987,3 +2053,13 @@ def test_get_dbt_ls_cache_remote_cache_dir( } assert result == expected_result + + +def test__normalize_path(): + """ + This normalizes the path (e.g. declared inside a manifest.json file) when it was created using MS Windows instead + of GNU Linux. + """ + original_value = "seeds\\seed_ifs_util_manual_event_id.csv" + expected_value = "seeds/seed_ifs_util_manual_event_id.csv" + assert _normalize_path(original_value) == expected_value diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index ef82c5881e..7cef7cc16d 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -284,12 +284,31 @@ def test_select_nodes_with_test_by_intersection_and_tag_ancestry(): def test_select_nodes_by_select_path(): + # Path without star or graph selector selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models"]) expected = { parent_node.unique_id: parent_node, } assert selected == expected + # Path with star + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models/*"]) + expected = { + parent_node.unique_id: parent_node, + } + assert selected == expected + + # Path with star and graph selector that retrieves descendants + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models/*+"]) + expected = { + child_node.unique_id: child_node, + parent_node.unique_id: parent_node, + sibling1_node.unique_id: sibling1_node, + sibling2_node.unique_id: sibling2_node, + sibling3_node.unique_id: sibling3_node, + } + assert selected == expected + def test_select_nodes_with_slash_but_no_path_selector(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["gen2/models"]) diff --git a/tests/operators/_asynchronous/test_bigquery.py b/tests/operators/_asynchronous/test_bigquery.py index f339c98805..783fa53e95 100644 --- a/tests/operators/_asynchronous/test_bigquery.py +++ b/tests/operators/_asynchronous/test_bigquery.py @@ -38,8 +38,6 @@ def test_dbt_run_airflow_async_bigquery_operator_init(profile_config_mock): assert operator.project_dir == "/path/to/project" assert operator.profile_config == profile_config_mock assert operator.gcp_conn_id == "google_cloud_default" - assert operator.gcp_project == "test_project" - assert operator.dataset == "test_dataset" def test_dbt_run_airflow_async_bigquery_operator_base_cmd(profile_config_mock): @@ -54,9 +52,9 @@ def test_dbt_run_airflow_async_bigquery_operator_base_cmd(profile_config_mock): @patch.object(DbtRunAirflowAsyncBigqueryOperator, "build_and_run_cmd") -def test_dbt_run_airflow_async_bigquery_operator_execute(mock_build_and_run_cmd, profile_config_mock, monkeypatch): +@patch("cosmos.operators._asynchronous.bigquery.settings.enable_setup_async_task", False) +def test_dbt_run_airflow_async_bigquery_operator_execute(mock_build_and_run_cmd, profile_config_mock): """Test execute calls build_and_run_cmd with correct parameters.""" - monkeypatch.setattr("cosmos.operators._asynchronous.bigquery.enable_setup_async_task", False) operator = DbtRunAirflowAsyncBigqueryOperator( task_id="test_task", project_dir="/path/to/project", @@ -134,15 +132,19 @@ def test_store_compiled_sql(mock_rendered_ti, mock_get_remote_sql, profile_confi mock_task_instance.task = operator mock_context = {"ti": mock_task_instance} - operator._store_compiled_sql(mock_context, session=mock_session) + operator._store_template_fields(mock_context, session=mock_session) + # check if gcp_project and dataset are set after the tasks gets executed assert operator.compiled_sql == "SELECT * FROM test_table;" + assert operator.dataset == "test_dataset" + assert operator.gcp_project == "test_project" + mock_rendered_ti.assert_called_once() mock_session.add.assert_called_once() mock_session.query().filter().delete.assert_called_once() -@patch("cosmos.operators._asynchronous.bigquery.DbtRunAirflowAsyncBigqueryOperator._store_compiled_sql") +@patch("cosmos.operators._asynchronous.bigquery.DbtRunAirflowAsyncBigqueryOperator._store_template_fields") def test_execute_complete(mock_store_sql, profile_config_mock): mock_context = Mock() mock_event = {"job_id": "test_job"} diff --git a/tests/operators/test_aws_ecs.py b/tests/operators/test_aws_ecs.py index 230c1616a2..af9ed226d4 100644 --- a/tests/operators/test_aws_ecs.py +++ b/tests/operators/test_aws_ecs.py @@ -185,6 +185,7 @@ def test_dbt_aes_ecs_overrides_parameter(): assert "containerOverrides" in actual_overrides actual_container_overrides = actual_overrides["containerOverrides"][0] + assert actual_container_overrides["name"] == "my-dbt-container-name" assert isinstance(actual_container_overrides["command"], list), "`command` should be of type list" assert "environment" in actual_container_overrides diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 3bcd78616d..bbafd0fda2 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -5,7 +5,7 @@ import sys import tempfile from pathlib import Path -from unittest.mock import MagicMock, call, patch +from unittest.mock import MagicMock, call, mock_open, patch import pytest from airflow import DAG @@ -664,7 +664,17 @@ def test_run_test_operator_with_callback(invocation_mode, failing_test_dbt_proje on_warning_callback=on_warning_callback, invocation_mode=invocation_mode, ) - run_operator >> test_operator + + build_operator = DbtBuildLocalOperator( + profile_config=mini_profile_config, + project_dir=failing_test_dbt_project, + task_id="build", + append_env=True, + on_warning_callback=on_warning_callback, + invocation_mode=invocation_mode, + ) + + run_operator >> build_operator >> test_operator run_test_dag(dag) assert on_warning_callback.called @@ -1431,10 +1441,10 @@ def test_mock_dbt_adapter_unsupported_profile_type(): @patch("airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator.execute") @patch("cosmos.operators.local.AbstractDbtLocalBase._read_run_sql_from_target_dir") -def test_async_execution_without_start_task(mock_read_sql, mock_bq_execute, monkeypatch): +@patch("cosmos.operators.local.settings.enable_setup_async_task", False) +def test_async_execution_without_start_task(mock_read_sql, mock_bq_execute): from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator - monkeypatch.setattr("cosmos.operators.local.enable_setup_async_task", False) mock_read_sql.return_value = "select * from 1;" operator = DbtRunLocalOperator( task_id="test", @@ -1447,6 +1457,7 @@ def test_async_execution_without_start_task(mock_read_sql, mock_bq_execute, monk mock_bq_execute.assert_called_once() +@pytest.mark.integration @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("pathlib.Path.rglob") @patch("cosmos.operators.local.AbstractDbtLocalBase._construct_dest_file_path") @@ -1464,3 +1475,19 @@ def test_async_execution_teardown_delete_files(mock_unlink, mock_construct_dest_ ) operator._handle_async_execution(project_dir, {}, {"profile_type": "bigquery", "teardown_task": True}) mock_unlink.assert_called() + + +def test_read_run_sql_from_target_dir(): + tmp_project_dir = "/tmp/project" + sql_context = {"dbt_node_config": {"file_path": "/path/to/file.sql"}, "package_name": "package_name"} + + operator = DbtRunLocalOperator( + task_id="test", + project_dir="/tmp", + profile_config=profile_config, + ) + + expected_sql_content = "SELECT * FROM my_table;" + with patch("pathlib.Path.open", new_callable=mock_open, read_data=expected_sql_content): + result = operator._read_run_sql_from_target_dir(tmp_project_dir, sql_context) + assert result == expected_sql_content diff --git a/tests/sample/small_manifest.json b/tests/sample/small_manifest.json new file mode 100644 index 0000000000..ee0448bdc6 --- /dev/null +++ b/tests/sample/small_manifest.json @@ -0,0 +1,27 @@ +{ + "nodes": { + "model.elementary.model_run_results": { + "alias": "model_run_results", + "checksum": { + "checksum": "99fba47e42516fbdc31e2546f687e3f780f21eaa5ac303e1ed22d23262ac5ec9", + "name": "sha256" + }, + "config": {}, + "database": "FDH_DEV_DB", + "fqn": [ + "elementary", + "edr", + "run_results", + "model_run_results" + ], + "name": "model_run_results", + "original_file_path": "models\\edr\\run_results\\model_run_results.sql", + "package_name": "elementary", + "path": "edr\\run_results\\model_run_results.sql", + "resource_type": "model", + "schema": "MONITORING", + "tags": [], + "unique_id": "model.elementary.model_run_results" + } + } +} diff --git a/tests/test_async_example_dag.py b/tests/test_async_example_dag.py new file mode 100644 index 0000000000..5d70af6a64 --- /dev/null +++ b/tests/test_async_example_dag.py @@ -0,0 +1,73 @@ +# We already have tests/test_example_dags.py, but it doesn’t run against multiple dbt versions in CI. +# Some dbt versions have shown parsing issues with certain example DAGs — something we may need to address over time. +# With PR #1535, the goal is to test the async example DAG across multiple dbt versions. To prevent the CI job from +# failing early due to unrelated DAG parsing errors, PR #1535 introduces this new test_async_example_dag.py file. +# This file replicates tests/test_example_dags.py but excludes all DAGs except simple_async_dag by adding them to +# .airflowignore. This ensures the CI job focuses solely on testing simple_async_dag over multiple dbt versions +# without being disrupted by other DAG parsing issues. + +from __future__ import annotations + +from pathlib import Path + +try: + from functools import cache +except ImportError: + from functools import lru_cache as cache + + +import airflow +import pytest +from airflow.models.dagbag import DagBag +from airflow.utils.db import create_default_connections +from airflow.utils.session import provide_session +from packaging.version import Version + +EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags" +ALL_FILES_TO_IGNORE = [ + f.name for f in EXAMPLE_DAGS_DIR.iterdir() if f.is_file() and f.suffix == ".py" and f.name != "simple_dag_async.py" +] + +AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" +AIRFLOW_VERSION = Version(airflow.__version__) + + +@provide_session +def get_session(session=None): + create_default_connections(session) + return session + + +@pytest.fixture() +def session(): + return get_session() + + +@cache +def get_dag_bag() -> DagBag: + """Create a DagBag by adding the files that are not supported to .airflowignore""" + + with open(AIRFLOW_IGNORE_FILE, "w+") as file: + for dagfile in ALL_FILES_TO_IGNORE: + print(f"Adding {dagfile} to .airflowignore") + file.writelines([f"{dagfile}\n"]) + + print(".airflowignore contents: ") + print(AIRFLOW_IGNORE_FILE.read_text()) + db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) + assert db.dags + assert not db.import_errors + return db + + +def get_dag_ids() -> list[str]: + dag_bag = get_dag_bag() + return dag_bag.dag_ids + + +@pytest.mark.integration +@pytest.mark.parametrize("dag_id", get_dag_ids()) +def test_example_dag(session, dag_id: str): + dag_bag = get_dag_bag() + dag = dag_bag.get_dag(dag_id) + dag.test() diff --git a/tests/test_config.py b/tests/test_config.py index d557dd4fc2..850d6358f5 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -28,6 +28,15 @@ def test_init_with_project_path_only(): assert project_config.snapshots_path == Path("path/to/dbt/project/snapshots") assert project_config.project_name == "project" assert project_config.manifest_path is None + assert project_config.install_dbt_deps is True + + +def test_init_with_project_path_and_install_dbt_deps_succeeds(): + """ + Passing only dbt_project_path and install_dbt_deps should succeed and set install_dbt_deps to the value defined + """ + project_config = ProjectConfig(dbt_project_path="path/to/dbt/project", install_dbt_deps=False) + assert project_config.install_dbt_deps is False def test_init_with_manifest_path_and_project_path_succeeds(): diff --git a/tests/test_converter.py b/tests/test_converter.py index 6b7d9181ff..8e1b724e03 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -96,9 +96,11 @@ def test_validate_initial_user_config_expects_profile(execution_mode): assert validate_initial_user_config(execution_config, profile_config, project_config, None, {}) is None -@pytest.mark.parametrize("operator_args", [{"env": {"key": "value"}}, {"vars": {"key": "value"}}]) +@pytest.mark.parametrize( + "operator_args", [{"env": {"key": "value"}}, {"vars": {"key": "value"}}, {"install_deps": {"key": "value"}}] +) def test_validate_user_config_operator_args_deprecated(operator_args): - """Deprecating warnings should be raised when using operator_args with "vars" or "env".""" + """Deprecating warnings should be raised when using operator_args with "vars", "env" or "install_deps".""" project_config = ProjectConfig() execution_config = ExecutionConfig() render_config = RenderConfig() @@ -709,6 +711,57 @@ def test_validate_converter_fetches_project_name_from_render_config( assert mock_build_airflow_graph.call_args.kwargs["dbt_project_name"] == "project1" +@pytest.mark.parametrize( + "execution_mode,operator_args,install_dbt_deps,expected", + [ + (ExecutionMode.KUBERNETES, {}, False, None), + (ExecutionMode.LOCAL, {}, False, False), + (ExecutionMode.VIRTUALENV, {}, False, False), + (ExecutionMode.LOCAL, {}, True, True), + (ExecutionMode.VIRTUALENV, {}, True, True), + (ExecutionMode.KUBERNETES, {"install_deps": True}, False, True), + (ExecutionMode.LOCAL, {"install_deps": True}, False, True), + (ExecutionMode.VIRTUALENV, {"install_deps": True}, False, True), + ], +) +@patch("cosmos.config.ProjectConfig.validate_project") +@patch("cosmos.converter.validate_initial_user_config") +@patch("cosmos.converter.DbtGraph") +@patch("cosmos.converter.build_airflow_graph") +def test_project_config_install_dbt_deps_overrides_operator_args( + mock_build_airflow_graph, + mock_user_config, + mock_dbt_graph, + mock_validate_project, + execution_mode, + operator_args, + install_dbt_deps, + expected, +): + """Tests that the value project_config.install_dbt_deps is used to define operator_args["install_deps"] if + execution mode is ExecutionMode.LOCAL or ExecutionMode.VIRTUALENV and operator_args["install_deps"] is not + already defined. + """ + project_config = ProjectConfig(project_name="fake-project", dbt_project_path="/some/project/path") + project_config.install_dbt_deps = install_dbt_deps + execution_config = ExecutionConfig(execution_mode=execution_mode) + render_config = MagicMock() + profile_config = MagicMock() + with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + _, kwargs = mock_build_airflow_graph.call_args + + assert kwargs["task_args"].get("install_deps", None) == expected + + @pytest.mark.parametrize("invocation_mode", [None, InvocationMode.SUBPROCESS, InvocationMode.DBT_RUNNER]) @patch("cosmos.config.ProjectConfig.validate_project") @patch("cosmos.converter.validate_initial_user_config") diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 762985b59d..5f325a22c5 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -2,6 +2,7 @@ import warnings from pathlib import Path +from unittest.mock import patch try: from functools import cache @@ -81,6 +82,9 @@ def get_dag_bag() -> DagBag: if DBT_VERSION < Version("1.5.0"): file.writelines(["example_source_rendering.py\n"]) + if AIRFLOW_VERSION < Version("2.8.0"): + file.writelines("example_cosmos_dbt_build.py\n") + print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) @@ -94,15 +98,7 @@ def get_dag_ids() -> list[str]: return dag_bag.dag_ids -@pytest.mark.skipif( - AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, - reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", -) -@pytest.mark.integration -@pytest.mark.parametrize("dag_id", get_dag_ids()) -def test_example_dag(session, dag_id: str): - if dag_id in KUBERNETES_DAGS: - return +def run_dag(dag_id: str): dag_bag = get_dag_bag() dag = dag_bag.get_dag(dag_id) @@ -130,3 +126,31 @@ def test_example_dag(session, dag_id: str): ) else: test_utils.run_dag(dag) + + +@pytest.mark.skipif( + AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", +) +@pytest.mark.integration +@pytest.mark.parametrize("dag_id", get_dag_ids()) +def test_example_dag(session, dag_id: str): + if dag_id in KUBERNETES_DAGS: + return + run_dag(dag_id) + + +async_dag_ids = ["simple_dag_async"] + + +@pytest.mark.skipif( + AIRFLOW_VERSION < Version("2.8") or AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="See PR: https://github.com/apache/airflow/pull/34585 and Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", +) +@pytest.mark.integration +@patch("cosmos.operators.local.settings.enable_setup_async_task", False) +@patch("cosmos.operators.local.settings.enable_teardown_async_task", False) +@patch("cosmos.operators._asynchronous.bigquery.settings.enable_setup_async_task", False) +def test_async_example_dag_without_setup_task(session): + for dag_id in async_dag_ids: + run_dag(dag_id) diff --git a/tests/test_io.py b/tests/test_io.py index 7410f05883..816ead519d 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -66,12 +66,13 @@ def test_upload_artifacts_to_azure_wasb(dummy_kwargs): assert hook_instance.load_file.call_count == 2 +@patch("cosmos.io.settings.remote_target_path", None) +@patch("cosmos.io.settings.remote_target_path_conn_id", None) def test_configure_remote_target_path_no_remote_target(): """Test _configure_remote_target_path when no remote target path is set.""" - with patch("cosmos.settings.remote_target_path", None): - from cosmos.io import _configure_remote_target_path + from cosmos.io import _configure_remote_target_path - assert _configure_remote_target_path() == (None, None) + assert _configure_remote_target_path() == (None, None) def test_construct_dest_file_path(dummy_kwargs): @@ -120,49 +121,51 @@ def test_upload_artifacts_to_cloud_storage_success(dummy_kwargs): @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") -@patch("cosmos.io.remote_target_path") -def test_configure_remote_target_path_no_conn_id(mock_remote_target_path): +@patch("cosmos.io.settings.remote_target_path", "s3://bucket/path/to/file") +@patch("cosmos.io.settings.remote_target_path_conn_id", None) +@patch("airflow.io.path.ObjectStoragePath") +@patch("cosmos.io.urlparse") +def test_configure_remote_target_path_no_conn_id(mock_urlparse, mock_object_storage): """Test when no remote_conn_id is provided, but conn_id is resolved from scheme.""" - mock_remote_target_path.return_value = "s3://bucket/path/to/file" + mock_urlparse.return_value.scheme = "s3" mock_storage_path = MagicMock() - with patch("cosmos.io.urlparse") as mock_urlparse: - mock_urlparse.return_value.scheme = "s3" - with patch("airflow.io.path.ObjectStoragePath") as mock_object_storage: - mock_object_storage.return_value = mock_storage_path - mock_storage_path.exists.return_value = True + mock_storage_path.exists.return_value = True + mock_object_storage.return_value = mock_storage_path + result = _configure_remote_target_path() - result = _configure_remote_target_path() - assert result == (mock_object_storage.return_value, _default_s3_conn) + assert result == (mock_object_storage.return_value, _default_s3_conn) @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") -@patch("cosmos.io.remote_target_path") -def test_configure_remote_target_path_conn_id_is_none(mock_remote_target_path): +@patch("cosmos.io.settings.remote_target_path", "abcd://bucket/path/to/file") +@patch("cosmos.io.settings.remote_target_path_conn_id", None) +@patch("airflow.io.path.ObjectStoragePath") +@patch("cosmos.io.urlparse") +def test_configure_remote_target_path_conn_id_is_none(mock_urlparse, mock_object_storage): """Test when conn_id cannot be resolved and is None.""" - mock_remote_target_path.return_value = "abcd://bucket/path/to/file" mock_storage_path = MagicMock() - with patch("cosmos.io.urlparse") as mock_urlparse: - mock_urlparse.return_value.scheme = "abcd" - with patch("airflow.io.path.ObjectStoragePath") as mock_object_storage: - mock_object_storage.return_value = mock_storage_path - mock_storage_path.exists.return_value = True - result = _configure_remote_target_path() - assert result == (None, None) + mock_urlparse.return_value.scheme = "abcd" + mock_storage_path.exists.return_value = True + + result = _configure_remote_target_path() + assert result == (None, None) @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @patch("cosmos.settings.AIRFLOW_IO_AVAILABLE", False) -@patch("cosmos.io.remote_target_path") -def test_configure_remote_target_path_airflow_io_unavailable(mock_remote_target_path): +@patch("cosmos.io.settings.remote_target_path", "s3://bucket/path/to/file") +@patch("airflow.io.path.ObjectStoragePath") +@patch("cosmos.io.urlparse") +def test_configure_remote_target_path_airflow_io_unavailable(mock_urlparse, mock_object_storage): """Test when AIRFLOW_IO_AVAILABLE is False.""" - mock_remote_target_path.return_value = "s3://bucket/path/to/file" + mock_urlparse.return_value.scheme = "s3" + mock_storage_path = MagicMock() - with patch("cosmos.io.urlparse") as mock_urlparse: - mock_urlparse.return_value.scheme = "s3" - with patch("airflow.io.path.ObjectStoragePath") as mock_object_storage: - mock_object_storage.return_value = mock_storage_path - mock_storage_path.exists.return_value = True - with pytest.raises(CosmosValueError) as exc_info: - _configure_remote_target_path() + mock_storage_path.exists.return_value = True + mock_object_storage.return_value = mock_storage_path + + with pytest.raises(CosmosValueError) as exc_info: + _configure_remote_target_path() + assert "Object Storage feature is unavailable" in str(exc_info.value)