Skip to content

Support emitting Assets with Airflow 3#1713

Merged
tatiana merged 16 commits into
mainfrom
dataset-support-af3
Apr 30, 2025
Merged

Support emitting Assets with Airflow 3#1713
tatiana merged 16 commits into
mainfrom
dataset-support-af3

Conversation

@tatiana
Copy link
Copy Markdown
Collaborator

@tatiana tatiana commented Apr 28, 2025

Supports emitting Assets (Datasets) when using Cosmos with Airflow 3.

This implementation was tested in two ways:

  1. By using airflow dags test
airflow dags test example_operators

We were able to observe via the logs that we're reaching the desired code:

[2025-04-28T13:30:44.272+0100] {runner.py:60} INFO - Trying to run dbtRunner with:
 ['run', '--models', 'stg_customers', '--project-dir', '/var/folders/td/522y78v91d1f5wgh67mj3p0m0000gn/T/tmp0wo7rsn5', '--profiles-dir', '/Users/tati/Code/cosmos-fresh/astronomer-cosmos/dev/dags/dbt/jaffle_shop', '--profile', 'default', '--target', 'dev']
 in /var/folders/td/522y78v91d1f5wgh67mj3p0m0000gn/T/tmp0wo7rsn5
12:30:44  Running with dbt=1.9.4
12:30:44  Registered adapter: postgres=1.9.0
12:30:44  Unable to do partial parsing because saved manifest not found. Starting full parse.
12:30:44  Found 5 models, 3 seeds, 18 data tests, 548 macros
12:30:44  
12:30:44  Concurrency: 4 threads (target='dev')
12:30:44  
12:30:45  1 of 1 START sql view model public.stg_customers ............................... [RUN]
12:30:45  1 of 1 OK created sql view model public.stg_customers .......................... [CREATE VIEW in 0.07s]
12:30:45  
12:30:45  Finished running 1 view model in 0 hours 0 minutes and 0.15 seconds (0.15s).
12:30:45  
12:30:45  Completed successfully
12:30:45  
12:30:45  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
[2025-04-28T13:30:45.635+0100] {local.py:490} INFO - Inlets: []
[2025-04-28T13:30:45.635+0100] {local.py:491} INFO - Outlets: [Asset(name='postgres://localhost:5432/postgres.public.stg_customers', uri='postgres://localhost:5432/postgres.public.stg_customers', group='asset', extra={}, watchers=[])]
  1. By using airflow standalone, triggering the DAG example_operators manually.

We also created a DAG that consumes the dataset created by example_operators, so we could confirm the DAG was being triggered via the generated dataset/alias:

from airflow import DAG
from airflow.sdk import Asset
from airflow.operators.empty import EmptyOperator


with DAG(
    "dataset_triggered_dag",
    description="A DAG that should be triggered via Dataset",
    schedule=[Asset("postgres://localhost:5432/postgres.public.stg_customers")],
) as dag:
    t1 = EmptyOperator(
        task_id="task_1",
    )
    t2 = EmptyOperator(
        task_id="task_2",
    )
    t1 >> t2

We were able to observe via the logs that we're reaching the desired code:

[2025-04-28, 13:18:30] INFO - Outlets: [Asset(name='postgres://localhost:5432/postgres.public.stg_customers', uri='postgres://localhost:5432/postgres.public.stg_customers', group='asset', extra={}, watchers=[])]: source="cosmos.operators.base"
[2025-04-28, 13:18:30] INFO - Assigning inlets/outlets with DatasetAlias in Airflow 3: source="cosmos.operators.local"

And we observed via Airflow UI the DAG dataset_triggered_dag being triggered.

Some screenshots:
Screenshot 2025-04-29 at 13 06 27
Screenshot 2025-04-29 at 13 06 41
Screenshot 2025-04-29 at 13 07 08

During the process of implementing this feature, we identified a few limitations of Airflow 3.0.0 assets implementation, which were discussed with @uranusjr:

  1. As of Airflow 3.0.0, we have to manually associate the AssetAlias to the self.outlets of the Operator
  2. It is not possible to see outlets in the task instance UI
  3. If the AssetAlias is created during task execution, the UI does not display the source DAG + event + triggered DAG
  4. If we create the AssetAlias during task initialization, these are displayed incorrectly in the UI - and we don't want to clutter the user's UI with aliasses that may not link to any actual Assets:

Screenshot 2025-04-29 at 10 58 18

Closes: #1635

@netlify
Copy link
Copy Markdown

netlify Bot commented Apr 28, 2025

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit d4d9c1c
🔍 Latest deploy log https://app.netlify.com/sites/sunny-pastelito-5ecb04/deploys/68123f6227bf5d0008fff11f

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented Apr 28, 2025

Deploying astronomer-cosmos with  Cloudflare Pages  Cloudflare Pages

Latest commit: d4d9c1c
Status:⚡️  Build in progress...

View logs

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 28, 2025

Codecov Report

Attention: Patch coverage is 92.85714% with 1 line in your changes missing coverage. Please review.

Project coverage is 97.60%. Comparing base (a4e0805) to head (9ce5631).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
cosmos/operators/local.py 92.85% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1713      +/-   ##
==========================================
+ Coverage   97.58%   97.60%   +0.02%     
==========================================
  Files          83       83              
  Lines        5174     5180       +6     
==========================================
+ Hits         5049     5056       +7     
+ Misses        125      124       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@tatiana tatiana force-pushed the dataset-support-af3 branch from 5fb3d6e to 8b48084 Compare April 28, 2025 13:33
@tatiana tatiana force-pushed the dataset-support-af3 branch from 8b48084 to 9ce5631 Compare April 29, 2025 10:35
@tatiana tatiana changed the title WIP: Support emitting Assets with Airflow 3 Support emitting Assets with Airflow 3 Apr 29, 2025
@tatiana tatiana force-pushed the dataset-support-af3 branch from 9ce5631 to 4156761 Compare April 29, 2025 16:09
tatiana added a commit that referenced this pull request Apr 29, 2025
Previously, the following tests were failing:

```
FAILED tests/operators/test_local.py::test_run_operator_dataset_inlets_and_outlets_airflow_210_onwards - ModuleNotFoundError: No module named 'airflow.models.dataset'
```

Details:
```
_______ test_run_operator_dataset_inlets_and_outlets_airflow_210_onwards _______

caplog = <_pytest.logging.LogCaptureFixture object at 0x7f1234faf3a0>

    @pytest.mark.skipif(
        version.parse(airflow_version) < version.parse("2.10"),
        reason="From Airflow 2.10 onwards, we started using DatasetAlias, which changed this behaviour.",
    )
    @pytest.mark.integration
    def test_run_operator_dataset_inlets_and_outlets_airflow_210_onwards(caplog):
>       from airflow.models.dataset import DatasetAliasModel
E       ModuleNotFoundError: No module named 'airflow.models.dataset'

tests/operators/test_local.py:471: ModuleNotFoundError
```

The test `test_run_operator_dataset_url_encoded_names` will be handled
in the PR #1713

Closes: #1704
@tatiana tatiana marked this pull request as ready for review April 29, 2025 16:14
Copilot AI review requested due to automatic review settings April 29, 2025 16:14
@dosubot dosubot Bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Apr 29, 2025
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for emitting Assets (Datasets) when using Airflow 3 by updating test cases and adapting operator logic to work with the new Airflow APIs. Key changes include renaming tests to reflect Airflow version support, adding new integration tests for Airflow 3 behavior regarding AssetAlias, and modifying dataset handling (now using Assets instead of Datasets) in cosmos/operators/local.py.

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
tests/operators/test_local.py Updated tests to skip or adjust for Airflow 3 behavior and to raise errors as needed.
cosmos/operators/local.py Refactored dataset handling to use Assets/AssetAlias and updated openlineage handling.
Comments suppressed due to low confidence (2)

tests/operators/test_local.py:563

  • It appears the second log check is missing the 'assert' keyword, so the condition is not actually being verified. Please add 'assert' to ensure this log message is tested.
"Outlets: [Asset(name='postgres://0.0.0.0:5432/postgres/public.stg_customers', uri='postgres://0.0.0.0:5432/postgres/public.stg_customers'" in caplog.text

cosmos/operators/local.py:713

  • [nitpick] Consider storing the AssetAlias instance in a local variable and reusing it (e.g. for both appending to self.outlets and as a dictionary key) to ensure consistency and avoid potential equality issues.
self.outlets.append(AssetAlias(dataset_alias_name))

@dosubot dosubot Bot added the area:datasets Related to the Airflow datasets feature/module label Apr 29, 2025
Copy link
Copy Markdown
Contributor

@pankajkoti pankajkoti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for adding this support so quick. Have some questions in-line for my understanding and few minor suggestions if you'd like to consider.

Happy to merge the PR once CI passes.

Comment thread cosmos/operators/local.py Outdated
Comment thread cosmos/operators/local.py
Comment thread tests/operators/test_local.py Outdated
Comment thread tests/operators/test_local.py Outdated
Comment thread tests/operators/test_local.py Outdated
Comment thread tests/operators/test_local.py Outdated
Comment thread tests/operators/test_local.py Outdated
Comment thread tests/operators/test_local.py Outdated
Comment thread tests/test_example_dags.py Outdated
@tatiana tatiana added this to the Cosmos 1.10.0 milestone Apr 30, 2025
@tatiana tatiana merged commit 44b7af7 into main Apr 30, 2025
0 of 2 checks passed
@tatiana tatiana deleted the dataset-support-af3 branch April 30, 2025 15:19
@tatiana tatiana mentioned this pull request Apr 30, 2025
tatiana added a commit that referenced this pull request May 1, 2025
Improve asset/dataset event scheduling after implementing #1713
tatiana added a commit that referenced this pull request May 1, 2025
Features

* Airflow 3 support
* Support running ``dbt deps`` incrementally to pre-defined
``dbt_packages`` by @tatiana in #1668 and #1670
* Add ``DuckDB`` profile mapping by @prithvijitguha and @pankajastro in
#1553
* Implement DBT exposure selector by ghjklw #1717

Bug Fixes

* Fix ``test_indirect_selection`` flag to be propagated in case of
``TestBehavior.BUILD`` by @corsettigyg in #1663
* Fix ``select`` clause in the case of detached tests by @anyapriya in
#1680
* Operator argument fixes by @johnhoran in #1648


Airflow 3 Support

* Support rendering DbtDag in Airflow 3 by @tatiana and @ashb in #1657
* Refactor Rendered Task Instance Fields (RTIF) handling for Airflow 2.x
and 3.x by @pankajkoti in #1661
* Run cosmos operator in Airflow 3 by @pankajastro in #1642
* Fix ``python_virtualenv.prepare_env`` top-level import for Airflow 3
by @pankajkoti in #1678
* Fix Variable not found issue in Airflow 3 by @tatiana in #1684
* Disable CosmosPlugin on Airflow 3 setup by @pankajkoti in #1692, #1698
* Use ``schedule`` param in example DAGs instead of the 2.10 deprecated
and 3.0 removed ``schedule_interval`` by @pankajkoti in #1701
* Ensure ``virtualenv_dir`` path exists by @pankajkoti in #1724
* Support emitting Assets with Airflow 3 by @tatiana in #1713
* Add docs on Airflow 3 compatibility by @pankajkoti and @tatiana in
#1731
* Introduce, test and document asset/dataset breaking change by @tatiana
in #1672
* Improve dataset/asset driven scheduling documentation by @tatiana in
#1729

Enhancements

* Allow multiple callbacks by @corsettigyg #1693
* Refactor kubernetes warning callback handling by @canbekley in #1681

Documentation

* Add documentation related to ``copy_dbt_packages`` by @tatiana in
#1671
* Make wording and command consistent in the contributing doc by
@pankajkoti in #1697
* Add MonteCarlo callback example for importing dbt artifacts by
@corsettigyg #1695
* Change async feature to be non-experimental by @tatiana in #1732

Others

* Add sample ``dbt_packages`` to validate incremental ``dbt deps`` by
@tatiana in #1669
* Add kubernetes execution mode example in Airflow 3 by @pankajastro in
#1667
* Check only major version until Airflow 3 stable release by
@pankajastro in #1665
* Install Airflow from main branch by @pankajastro in #1660
* Add dev tool for Airflow 3 by @pankajastro and @tatiana in #1627
* Improve Airflow 3 tooling by @pankajastro in #1656
* Skip associating ``openlineage_events_completes`` to ``ti`` in Airflow
3 by @pankajkoti in #1662
* Add .gitignore file for the scripts/airflow3 directory by @pankajkoti
in #1658
* Remove ``original_jaffle_shop`` dbt project by @pankajkoti in #1676
* Fix or ignore type check error by @pankajastro in #1687
* Run virtualenv example with Airflow 3 tooling by @pankajastro in #1686
* Enable running setup/teardown tasks with Async execution DAG with
Airflow 3 tooling by @pankajastro in #1696
* Enable integration tests for the DuckDB adapter by @pankajastro in
#1699
* Add Airflow 3 tests matrix entries in CI by @pankajkoti in #1646
* Use a different way to get tasks count for asserting test_perf_dag by
@pankajkoti in #1714
* Reinstall Airflow 3 dependency on ``pydantic>=2.11`` for dbt adapter
versions 1.6 & 1.9 by @pankajkoti in #1715
* Fix outdated ``echo`` in Airflow 3 tooling script #1700
* Add files not needed for git tracking to .gitignore by @pankajkoti in
#1723
* Use latest minor versions for dbt adapters to get in compatibility
fixes by @pankajkoti in #1719
* Fix Airflow 3 tests raising generate_run_id() takes 0 positional
arguments by @tatiana in #1725
* Fix dataset tests failing in Airflow 3 by @tatiana in #1716
* Enable example DAGs to run in CI that were disabled in PR #1646 by
@pankajkoti in #1726
* Pre-commit updates: #1666, #1653, #1641, #1682, #1720


Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Co-authored-by: Pankaj Singh
<98807258+pankajastro@users.noreply.github.com>

---------

Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:datasets Related to the Airflow datasets feature/module area:testing Related to testing, like unit tests, integration tests, etc execution:local Related to Local execution environment size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Leverage Airflow 3 assets if it is installed, otherwise use DatasetAlias and Dataset

3 participants