Skip to content

Improve dataset/asset experience in Cosmos#2030

Merged
tatiana merged 11 commits into
mainfrom
improve-assets
Oct 14, 2025
Merged

Improve dataset/asset experience in Cosmos#2030
tatiana merged 11 commits into
mainfrom
improve-assets

Conversation

@tatiana
Copy link
Copy Markdown
Collaborator

@tatiana tatiana commented Oct 13, 2025

While validating Cosmos Airflow Asset support in Airflow 3.1, I noticed that we could improve the overall experience.

Previously, Cosmos required both openlineage-integration-common and openlineage-airflow or apache-airflow-providers-openlineage to be installed to emit Airflow Datasets or Airflow Assets. However, the only necessary dependency is openlineage-integration-common.

Users will need either openlineage-airflow or apache-airflow-providers-openlineage for emitting OpenLineage events - but not Airflow Datasets and Aliasses.

This PR improves this behaviour by adding openlineage-integration-common as a dependency of the Cosmos standard installation - and not failing to emit datasets if openlineage-airflow or apache-airflow-providers-openlineage are not installed.

It was validated with Airflow 3.1.0 and validates with the following three DAGs:

Producer Cosmos DbtDag

Existing basic_cosmos_dag example

import os
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ProfileConfig, ProjectConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJECT_NAME = os.getenv("DBT_PROJECT_NAME", "jaffle_shop")
DBT_PROJECT_PATH = DBT_ROOT_PATH / DBT_PROJECT_NAME


profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=PostgresUserPasswordProfileMapping(
        conn_id="example_conn",
        profile_args={"schema": "public"},
        disable_event_tracking=True,
    ),
)

basic_cosmos_dag = DbtDag(
    # dbt/cosmos-specific parameters
    project_config=ProjectConfig(DBT_PROJECT_PATH),
    profile_config=profile_config,
    operator_args={
        "install_deps": True,  # install any necessary dependencies before running any dbt command
        "full_refresh": True,  # used only in dbt commands that support this flag
    },
    # normal dag parameters
    schedule="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="basic_cosmos_dag",
    default_args={"retries": 0},
)

Consumer DAG scheduled based on an Asset

from datetime import datetime
from airflow import DAG
from airflow.sdk.definitions.asset import Asset
from airflow.operators.empty import EmptyOperator


with DAG(
    "dataset_triggered_dag",
    description="A DAG that should be triggered via Dataset",
    start_date=datetime(2024, 9, 1),
    schedule=[Asset(uri="postgres://0.0.0.0:5432/postgres/public/orders")],
) as dag:
    t1 = EmptyOperator(
        task_id="task_1",
    )
    t2 = EmptyOperator(
        task_id="task_2",
    )

    t1 >> t2

Consumer DAG scheduled based on a AssetAlias

from datetime import datetime
from airflow import DAG
from airflow.sdk.definitions.asset import AssetAlias
from airflow.operators.empty import EmptyOperator


with DAG(
    "datasetalias_triggered_dag",
    description="A DAG that should be triggered via Dataset alias",
    start_date=datetime(2024, 9, 1),
    schedule=[AssetAlias(name="basic_cosmos_dag__orders__run")],
) as dag:

    t3 = EmptyOperator(
        task_id="task_3",
    )

    t3

Copilot AI review requested due to automatic review settings October 13, 2025 14:41
@netlify
Copy link
Copy Markdown

netlify Bot commented Oct 13, 2025

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit ea1919f
🔍 Latest deploy log https://app.netlify.com/projects/sunny-pastelito-5ecb04/deploys/68ee13ab3949f000086f0815

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR improves the dataset/asset experience in Cosmos by making openlineage-integration-common a required dependency while clarifying that openlineage-airflow or apache-airflow-providers-openlineage are only needed for emitting OpenLineage events, not for Airflow Datasets/Assets functionality.

  • Added openlineage-integration-common as a core dependency in pyproject.toml
  • Refactored import logic to separate OpenLineage artifact processing from OpenLineage event emission
  • Updated variable names and comments to better distinguish between OpenLineage common functionality and Airflow provider functionality

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
pyproject.toml Added openlineage-integration-common as required dependency
cosmos/operators/local.py Refactored OpenLineage imports and clarified distinction between artifact processing and event emission

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment thread cosmos/operators/local.py Outdated
@codecov
Copy link
Copy Markdown

codecov Bot commented Oct 13, 2025

Codecov Report

❌ Patch coverage is 62.50000% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 97.79%. Comparing base (8436351) to head (ea1919f).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
cosmos/operators/local.py 62.50% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2030      +/-   ##
==========================================
- Coverage   97.84%   97.79%   -0.06%     
==========================================
  Files          89       89              
  Lines        5624     5623       -1     
==========================================
- Hits         5503     5499       -4     
- Misses        121      124       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@tatiana
Copy link
Copy Markdown
Collaborator Author

tatiana commented Oct 13, 2025

BTW: The lines not covered by tests were already not covered by tests before.

Copy link
Copy Markdown
Contributor

@pankajastro pankajastro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve posted a few quick questions; otherwise, everything looks good. Thanks!

Comment thread dev/dags/performance_dag.py
Comment thread cosmos/operators/local.py Outdated
Comment thread pyproject.toml
Comment thread cosmos/operators/local.py
Comment thread cosmos/operators/local.py Outdated
@tatiana tatiana merged commit b4d8907 into main Oct 14, 2025
94 of 96 checks passed
@tatiana tatiana deleted the improve-assets branch October 14, 2025 11:56
@tatiana tatiana added this to the Cosmos 1.11.0 milestone Oct 28, 2025
@tatiana tatiana mentioned this pull request Oct 29, 2025
tatiana added a commit that referenced this pull request Oct 29, 2025
**Features**

* Introduce ``ExecutionMode.WATCHER`` to reduce DAG run time by 1/5 in
several PRs. Learn more about it
[here](https://astronomer.github.io/astronomer-cosmos/getting_started/watcher-execution-mode.html#watcher-execution-mode).
This feature was implemented via multiple PRs, including:
* Expose new execution mode by @tatiana @pankajastro @pankajkoti in
#1999
* Add ``DbtProducerWatcherOperator`` for the proposed
``ExecutionMode.WATCHER`` by @pankajkoti in #1982
* Add ``DbtConsumerWatcherSensor`` for the proposed
``ExecutionMode.WATCHER`` by @pankajastro in #1998
* Push producer's task completion status to XCOM by @pankajkoti in #2000
* Add default priority_weight for ``DbtProducerWatcherOperator`` by
@pankajkoti in #1995
* Add sample dbt events for the dbt watcher execution mode by
@pankajkoti in #1952
* Add ``compiled_sql`` as a template fields on
```ExecutionMode.WATCHER``` when using ``run_results.json`` by
@pankajastro in #2070
* Set ``push_run_results_to_xcom`` kwargs correctly for invocation mode
subprocess and Watcher mode by @pankajastro in #2067
* Store compiled SQL as template field for dbt callback events in
``ExecutionMode.WATCHER`` by @pankajkoti in #2068
* Add initial documentation for ``ExecutionMode.WATCHER`` by @tatiana in
#2046
* Support running ``State.UPSTREAM_FAILED`` tasks when WATCHER consumer
upstream tasks fail by @tatiana in #2062
* Fail sensor tasks immediately if the ``ExecutionMode.WATCHER``
producer task fails by @pankajastro in #2040
  * Add ``WATCHER``` to GitHub issue template by @tatiana in #2056
* Add support for ``TestBehavior.AFTER_ALL`` with
``ExecutionMode.WATCHER`` by @pankajastro in #2049
* Add support for ``TestBehavior.NONE`` with ``ExecutionMode.WATCHER``
by @pankajastro in #2047
* Fix ``ExecutionMode.WATCHER`` behaviour with ``DbtTaskGroup`` by
@tatiana in #2044
* Fix Cosmos behaviour when using watcher with
``InvocationMode.DBT_RUNNER`` by @tatiana in #2048

* Add Airflow 3 plugin for dbt docs with multiple dbt projects support
by @pankajkoti in #2009, check the
[documentation](https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html).
* Initial support to ``dbt Fusion`` by @tatiana in #1803. More details
[here](https://astronomer.github.io/astronomer-cosmos/configuration/dbt-fusion).
* Support to prune sources without downstream references in dbt projects
by @corsettigyg in #1988
* Allow to set task display name as a user-defined function by
@corsettigyg in #1761
* Add dbt project's hash to dag docs to support dag versioning in
Airflow 3 by @pankajkoti in #1907
* feat: Add Jinja templating support for ``dbt_cmd_flags`` by
@skillicinski in #1899
* Add Scarf metric to collect the execution mode uses by @pankajastro in
#1981
* Support Airflow 3.1 by @tatiana in #1980
* Add MySQL profile mapping by @Lee2532 in #1977
* Add sqlserver profile mapping by @pankajastro in #1737

**Enhancement**

* Use XCom to store sql when using ``ExecutionMode.AIRFLOW_ASYNC`` by
@pankajastro in #1934
* Refactor ``AIRFLOW_ASYNC`` teardown so it doesn't install the
virtualenv by @pankajastro in #1938
* Reuse the virtual env for ``AIRFLOW_ASYNC`` setup task by @pankajastro
in #1939
* Improve dataset/asset experience in Cosmos by @tatiana in #2030
* Add ``downstreams`` to ``DbtNode`` by @wornjs in #2028

**Bug fixes**

* Fix tags extraction by @ms32035 in #1915
* Fix task flow operator args by @anyapriya in #2024

**Documentation**

* Add documentation for Airflow 3 Plugin supporting dbt docs for
multiple dbt projects by @pankajkoti in #2063
* Add Cosmos Deferrable Operator Guide by @pankajastro in #1922
* Add dbt Fusion documentation by @tatiana in #1824 #1830
* Update dbt-fusion.rst to explicitly highlight it is in alpha by
@tatiana in #1838
* Fix a bunch of docs build errors and warnings by @pankajkoti in
#1886
* Add docs note for param virtualenv_dir for async execution mode by
@pankajastro in #1969
* Use pepy.tech downloads badge in README by @pankajkoti in #1920
* Correct the default value of ``cache_dir`` by @seokyun.ha in #2027

**Others**

* Promote @corsettigyg to committer by @tatiana in #1985
* Add @pankajkoti and @pankajastro to ``contributors.rst`` by @tatiana
in #1983
* Update setup script for airflow3 script by @dwreeves in #2023
* Prevent pytest from trying to test classes that aren't actually tests
by @anyapriya in #2032
* Fix ``dag.test()`` for Airflow 3.1+ by syncing DAG to database bby
@kaxil in #2037
* Disable Scarf in CI by @pankajastro in #2016
* Fix failing dbt Fusion tests when run in parallel in CI by @pankajkoti
in #1896
* Fix MyPy issues related to ``ObjectStoragePath`` in main branch by
@tatiana in #2012
* Cleanup example dbt event JSON dictionaries kept for XCOM referencby
@pankajkoti in #1997
* Bump min hatch version that includes fixes for click>=8.3.0 by
@pankajkoti in #1996
* Use official postgres image from Docker hub for kubernetes setup by
@pankajkoti in #1986
* Use click<8.3.0 for hatch as click 8.3 breaks hatch by @pankajkoti in
#1987
* Pin Airflow version in type check CI job by @pankajastro in #2003
* Improve comments after feedback on #1948 by @tatiana in #1963
* Fix running tests with dbt Fusion 2.0.0 preview versions by @tatiana
in #1948
* Test hardening of dbt node having tags as unset or missing by
@pankajkoti in #1918
* Fix Sphinx issue in the main branch by @tatiana in #2064
* pre-commit autoupdate in #2065, #2043, #2033, #2019, #1990, #2019,
#2008, #1941, #1935, #1924
* GitHub dependabot update in #2051, #2050, #2038, #2022, #1947, #1955,
#1946, #1944, #1945, #1928, #1921, #1917


Co-authored-by: Pankaj Koti <pankaj.koti@astronomer.io>
Co-authored-by: Pankaj Singh <pankaj.singh@astronomer.io>
Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants