Skip to content

Support rendering DbtDag in Airflow 3#1657

Merged
tatiana merged 1 commit into
mainfrom
fix-dag-rendering-af3main
Apr 9, 2025
Merged

Support rendering DbtDag in Airflow 3#1657
tatiana merged 1 commit into
mainfrom
fix-dag-rendering-af3main

Conversation

@tatiana
Copy link
Copy Markdown
Collaborator

@tatiana tatiana commented Apr 9, 2025

Add necessary changes so we can render Cosmos DbtDag using LoadMode.DBT_LS when using Airflow 3 main branch (commit 9d6bebe9690a3f21a00044c0aeceaded86484a0c).

Previously, we were unable to run the DAG:

import os
from datetime import datetime
from pathlib import Path
from airflow.models import Variable

from cosmos import DbtDag, ExecutionConfig, LoadMode, ProfileConfig, ProjectConfig, RenderConfig, TestBehavior

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent.parent.parent / "dev/dags/dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"

project_config = ProjectConfig(
    dbt_project_path=DBT_PROJ_DIR,
    project_name="jaffle_shop",
)

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profiles_yml_filepath=DBT_PROFILE_PATH,
)

basic_cosmos_dag = DbtDag(
    # dbt/cosmos-specific parameters
    project_config=project_config,
    profile_config=profile_config,
    operator_args={
        "install_deps": True,  # install any necessary dependencies before running any dbt command
        "full_refresh": True,  # used only in dbt commands that support this flag
    },
    # normal dag parameters
    schedule="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="basic_cosmos_dag",
    default_args={

It would raise the following exception when we attempted to run airflow dags reserialize:

i  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/airflow/graph.py", line 592, in build_airflow_graph
    task_or_group = conversion_function(  # type: ignore
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/airflow/graph.py", line 391, in generate_task_or_group
    task >> test_task
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/venv-af3-main/lib/python3.10/site-packages/airflow/sdk/definitions/_internal/mixins.py", line 97, in __rshift__
    self.set_downstream(other)
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/venv-af3-main/lib/python3.10/site-packages/airflow/sdk/definitions/_internal/node.py", line 216, in set_downstream
    self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/venv-af3-main/lib/python3.10/site-packages/airflow/sdk/definitions/_internal/node.py", line 186, in _set_relatives
    raise ValueError(
ValueError: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again: [<Task(DbtRunLocalOperator): run>, <Task(DbtTestLocalOperator): test>]

This issue happened due to an interface change in Airflow 3's BaseOperator, that would lead the line:

base_operator_args = set(inspect.signature(BaseOperator.__init__).parameters.keys())

To return zero arguments.

This was solved by changing the import path:

try:  # Airflow 3
    from airflow.sdk.bases.operator import BaseOperator
except ImportError:  # Airflow 2
    from airflow.models import BaseOperator

After these changes, we were able to successfully run:

  • airflow dags reserialize
  • airflow dags test basic_cosmos_dag

Co-authored-by: Ash Berlin-Taylor ash@astronomer.io

Add necessary changes so we can render Cosmos `DbtDag` using `LoadMode.DBT_LS` when using Airflow 3 main branch  (commit 9d6bebe9690a3f21a00044c0aeceaded86484a0c).

Previously, we were unable to run the DAG:
```
import os
from datetime import datetime
from pathlib import Path
from airflow.models import Variable

from cosmos import DbtDag, ExecutionConfig, LoadMode, ProfileConfig, ProjectConfig, RenderConfig, TestBehavior

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent.parent.parent / "dev/dags/dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"

project_config = ProjectConfig(
    dbt_project_path=DBT_PROJ_DIR,
    project_name="jaffle_shop",
)

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profiles_yml_filepath=DBT_PROFILE_PATH,
)

basic_cosmos_dag = DbtDag(
    # dbt/cosmos-specific parameters
    project_config=project_config,
    profile_config=profile_config,
    operator_args={
        "install_deps": True,  # install any necessary dependencies before running any dbt command
        "full_refresh": True,  # used only in dbt commands that support this flag
    },
    # normal dag parameters
    schedule="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="basic_cosmos_dag",
    default_args={
```

It would raise the following exception when we attempted to run `airflow dags reserialize`:
```
i  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/airflow/graph.py", line 592, in build_airflow_graph
    task_or_group = conversion_function(  # type: ignore
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/airflow/graph.py", line 391, in generate_task_or_group
    task >> test_task
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/venv-af3-main/lib/python3.10/site-packages/airflow/sdk/definitions/_internal/mixins.py", line 97, in __rshift__
    self.set_downstream(other)
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/venv-af3-main/lib/python3.10/site-packages/airflow/sdk/definitions/_internal/node.py", line 216, in set_downstream
    self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/venv-af3-main/lib/python3.10/site-packages/airflow/sdk/definitions/_internal/node.py", line 186, in _set_relatives
    raise ValueError(
ValueError: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again: [<Task(DbtRunLocalOperator): run>, <Task(DbtTestLocalOperator): test>]
```

This issue happened due to an interface change in Airflow 3's `BaseOperator`, that would lead the line:
```
base_operator_args = set(inspect.signature(BaseOperator.__init__).parameters.keys())
```

To return zero arguments.

This was solved by changing the import path:

```
try:  # Airflow 3
    from airflow.sdk.bases.operator import BaseOperator
except ImportError:  # Airflow 2
    from airflow.models import BaseOperator
```

After these changes, we were able to successfully run:
- `airflow dags reserialize`
- `airflow dags test basic_cosmos_dag`
Copilot AI review requested due to automatic review settings April 9, 2025 15:17
@dosubot dosubot Bot added the size:M This PR changes 30-99 lines, ignoring generated files. label Apr 9, 2025
@netlify
Copy link
Copy Markdown

netlify Bot commented Apr 9, 2025

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit 4629518
🔍 Latest deploy log https://app.netlify.com/sites/sunny-pastelito-5ecb04/deploys/67f68f8011c9270008cb1220

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.

@dosubot dosubot Bot added the area:rendering Related to rendering, like Jinja, Airflow tasks, etc label Apr 9, 2025
@cloudflare-workers-and-pages
Copy link
Copy Markdown

Deploying astronomer-cosmos with  Cloudflare Pages  Cloudflare Pages

Latest commit: 4629518
Status: ✅  Deploy successful!
Preview URL: https://eb8c0d59.astronomer-cosmos.pages.dev
Branch Preview URL: https://fix-dag-rendering-af3main.astronomer-cosmos.pages.dev

View logs

@tatiana
Copy link
Copy Markdown
Collaborator Author

tatiana commented Apr 9, 2025

The test coverage for this PR will fail, but it is expected to be solved once the following ticket is implemented:
#1646

Copy link
Copy Markdown
Contributor

@pankajastro pankajastro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 9, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 97.47%. Comparing base (a0a1cb5) to head (4629518).
Report is 1 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #1657   +/-   ##
=======================================
  Coverage   97.47%   97.47%           
=======================================
  Files          80       80           
  Lines        4982     4991    +9     
=======================================
+ Hits         4856     4865    +9     
  Misses        126      126           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@tatiana tatiana merged commit fb613a5 into main Apr 9, 2025
@tatiana tatiana deleted the fix-dag-rendering-af3main branch April 9, 2025 15:42
Copy link
Copy Markdown
Contributor

@pankajkoti pankajkoti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so glad, it was this change and we did not have to do more altercations. Thanks a lot @tatiana for getting this to work 👏🏽

@tatiana
Copy link
Copy Markdown
Collaborator Author

tatiana commented Apr 10, 2025

anai-s pushed a commit to anai-s/astronomer-cosmos that referenced this pull request Apr 15, 2025
Add necessary changes so we can render Cosmos `DbtDag` using
`LoadMode.DBT_LS` when using Airflow 3 main branch (commit
9d6bebe9690a3f21a00044c0aeceaded86484a0c).

Previously, we were unable to run the DAG:
```
import os
from datetime import datetime
from pathlib import Path
from airflow.models import Variable

from cosmos import DbtDag, ExecutionConfig, LoadMode, ProfileConfig, ProjectConfig, RenderConfig, TestBehavior

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent.parent.parent / "dev/dags/dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"

project_config = ProjectConfig(
    dbt_project_path=DBT_PROJ_DIR,
    project_name="jaffle_shop",
)

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profiles_yml_filepath=DBT_PROFILE_PATH,
)

basic_cosmos_dag = DbtDag(
    # dbt/cosmos-specific parameters
    project_config=project_config,
    profile_config=profile_config,
    operator_args={
        "install_deps": True,  # install any necessary dependencies before running any dbt command
        "full_refresh": True,  # used only in dbt commands that support this flag
    },
    # normal dag parameters
    schedule="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="basic_cosmos_dag",
    default_args={
```

It would raise the following exception when we attempted to run `airflow
dags reserialize`:
```
i  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/airflow/graph.py", line 592, in build_airflow_graph
    task_or_group = conversion_function(  # type: ignore
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/cosmos/airflow/graph.py", line 391, in generate_task_or_group
    task >> test_task
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/venv-af3-main/lib/python3.10/site-packages/airflow/sdk/definitions/_internal/mixins.py", line 97, in __rshift__
    self.set_downstream(other)
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/venv-af3-main/lib/python3.10/site-packages/airflow/sdk/definitions/_internal/node.py", line 216, in set_downstream
    self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
  File "/Users/tati/Code/cosmos-fresh/astronomer-cosmos/venv-af3-main/lib/python3.10/site-packages/airflow/sdk/definitions/_internal/node.py", line 186, in _set_relatives
    raise ValueError(
ValueError: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again: [<Task(DbtRunLocalOperator): run>, <Task(DbtTestLocalOperator): test>]
```

This issue happened due to an interface change in Airflow 3's
`BaseOperator`, that would lead the line:
```
base_operator_args = set(inspect.signature(BaseOperator.__init__).parameters.keys())
```

To return zero arguments.

This was solved by changing the import path:

```
try:  # Airflow 3
    from airflow.sdk.bases.operator import BaseOperator
except ImportError:  # Airflow 2
    from airflow.models import BaseOperator
```

After these changes, we were able to successfully run:
- `airflow dags reserialize`
- `airflow dags test basic_cosmos_dag`

Co-authored-by: Ash Berlin-Taylor <ash@astronomer.io>
@tatiana tatiana mentioned this pull request Apr 16, 2025
@tatiana tatiana added this to the Cosmos 1.10.0 milestone Apr 17, 2025
tatiana added a commit that referenced this pull request May 1, 2025
Features

* Airflow 3 support
* Support running ``dbt deps`` incrementally to pre-defined
``dbt_packages`` by @tatiana in #1668 and #1670
* Add ``DuckDB`` profile mapping by @prithvijitguha and @pankajastro in
#1553
* Implement DBT exposure selector by ghjklw #1717

Bug Fixes

* Fix ``test_indirect_selection`` flag to be propagated in case of
``TestBehavior.BUILD`` by @corsettigyg in #1663
* Fix ``select`` clause in the case of detached tests by @anyapriya in
#1680
* Operator argument fixes by @johnhoran in #1648


Airflow 3 Support

* Support rendering DbtDag in Airflow 3 by @tatiana and @ashb in #1657
* Refactor Rendered Task Instance Fields (RTIF) handling for Airflow 2.x
and 3.x by @pankajkoti in #1661
* Run cosmos operator in Airflow 3 by @pankajastro in #1642
* Fix ``python_virtualenv.prepare_env`` top-level import for Airflow 3
by @pankajkoti in #1678
* Fix Variable not found issue in Airflow 3 by @tatiana in #1684
* Disable CosmosPlugin on Airflow 3 setup by @pankajkoti in #1692, #1698
* Use ``schedule`` param in example DAGs instead of the 2.10 deprecated
and 3.0 removed ``schedule_interval`` by @pankajkoti in #1701
* Ensure ``virtualenv_dir`` path exists by @pankajkoti in #1724
* Support emitting Assets with Airflow 3 by @tatiana in #1713
* Add docs on Airflow 3 compatibility by @pankajkoti and @tatiana in
#1731
* Introduce, test and document asset/dataset breaking change by @tatiana
in #1672
* Improve dataset/asset driven scheduling documentation by @tatiana in
#1729

Enhancements

* Allow multiple callbacks by @corsettigyg #1693
* Refactor kubernetes warning callback handling by @canbekley in #1681

Documentation

* Add documentation related to ``copy_dbt_packages`` by @tatiana in
#1671
* Make wording and command consistent in the contributing doc by
@pankajkoti in #1697
* Add MonteCarlo callback example for importing dbt artifacts by
@corsettigyg #1695
* Change async feature to be non-experimental by @tatiana in #1732

Others

* Add sample ``dbt_packages`` to validate incremental ``dbt deps`` by
@tatiana in #1669
* Add kubernetes execution mode example in Airflow 3 by @pankajastro in
#1667
* Check only major version until Airflow 3 stable release by
@pankajastro in #1665
* Install Airflow from main branch by @pankajastro in #1660
* Add dev tool for Airflow 3 by @pankajastro and @tatiana in #1627
* Improve Airflow 3 tooling by @pankajastro in #1656
* Skip associating ``openlineage_events_completes`` to ``ti`` in Airflow
3 by @pankajkoti in #1662
* Add .gitignore file for the scripts/airflow3 directory by @pankajkoti
in #1658
* Remove ``original_jaffle_shop`` dbt project by @pankajkoti in #1676
* Fix or ignore type check error by @pankajastro in #1687
* Run virtualenv example with Airflow 3 tooling by @pankajastro in #1686
* Enable running setup/teardown tasks with Async execution DAG with
Airflow 3 tooling by @pankajastro in #1696
* Enable integration tests for the DuckDB adapter by @pankajastro in
#1699
* Add Airflow 3 tests matrix entries in CI by @pankajkoti in #1646
* Use a different way to get tasks count for asserting test_perf_dag by
@pankajkoti in #1714
* Reinstall Airflow 3 dependency on ``pydantic>=2.11`` for dbt adapter
versions 1.6 & 1.9 by @pankajkoti in #1715
* Fix outdated ``echo`` in Airflow 3 tooling script #1700
* Add files not needed for git tracking to .gitignore by @pankajkoti in
#1723
* Use latest minor versions for dbt adapters to get in compatibility
fixes by @pankajkoti in #1719
* Fix Airflow 3 tests raising generate_run_id() takes 0 positional
arguments by @tatiana in #1725
* Fix dataset tests failing in Airflow 3 by @tatiana in #1716
* Enable example DAGs to run in CI that were disabled in PR #1646 by
@pankajkoti in #1726
* Pre-commit updates: #1666, #1653, #1641, #1682, #1720


Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Co-authored-by: Pankaj Singh
<98807258+pankajastro@users.noreply.github.com>

---------

Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:rendering Related to rendering, like Jinja, Airflow tasks, etc size:M This PR changes 30-99 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants