Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: test

on:
push: # Run on pushes to the default branch
branches: [main]
branches: [main, dbtf]
pull_request_target: # Also run on pull requests originated from forks
branches: [main]

Expand Down Expand Up @@ -424,6 +424,65 @@ jobs:
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH

Run-Integration-dbt-fusion-Tests:
needs: Authorize
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11"]
airflow-version: ["2.10", "3.0"]
dbt-version: ["2.0"] # dbt Fusion
exclude:
- python-version: "3.8"
airflow-version: "3.0"

steps:
- uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha || github.ref }}

- uses: actions/cache@v4
with:
path: |
~/.cache/pip
.local/share/hatch/
key: integration-dbtf-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}--${{ matrix.dbt-version }}${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }}

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install packages and dependencies
run: |
python -m pip install uv
uv pip install --system hatch
hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze

- name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }}
run: |
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-dbtf-setup
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-dbtf
env:
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
SNOWFLAKE_SCHEMA: ${{ secrets.SNOWFLAKE_SCHEMA }}
SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWFLAKE_WAREHOUSE }}
SNOWFLAKE_DATABASE: ${{ secrets.SNOWFLAKE_DATABASE }}

- name: Upload coverage to Github
uses: actions/upload-artifact@v4
with:
name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}
path: .coverage
include-hidden-files: true

Run-Performance-Tests:
needs: Authorize
runs-on: ubuntu-latest
Expand Down
12 changes: 11 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
Changelog
=========

1.10.1 (2025-05-21)
1.11.0a1 (2025-06-23)
---------------------

Feature

* Initial support to ``dbt Fusion`` by @tatiana in #1803. `More details here. <https://astronomer.github.io/astronomer-cosmos/configuration/dbt-fusion>`_.

(many other features, pending details)


1.10.1 (2025-05-21)
-------------------

Bug Fixes

* Fix ``full_refresh`` parameter in ``AIRFLOW_ASYNC`` ``ExecutionConfig`` mode by @tuantran0910 in #1738
Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from cosmos import settings

__version__ = "1.10.2a1"
__version__ = "1.11.0a1"

if not settings.enable_memory_optimised_imports:
from cosmos.airflow.dag import DbtDag
Expand Down
51 changes: 32 additions & 19 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,7 @@ def create_test_task_metadata(
if test_indirect_selection != TestIndirectSelection.EAGER:
task_args["indirect_selection"] = test_indirect_selection.value
if node is not None:
if node.resource_type == DbtResourceType.MODEL:
task_args["models"] = node.resource_name
elif node.resource_type == DbtResourceType.SOURCE:
if node.resource_type == DbtResourceType.SOURCE:
task_args["select"] = f"source:{node.resource_name}"
elif is_detached_test(node):
task_args["select"] = node.resource_name.split(".")[0]
Comment thread
tatiana marked this conversation as resolved.
Expand All @@ -163,7 +161,6 @@ def create_test_task_metadata(

extra_context = {"dbt_node_config": node.context_dict}
task_owner = node.owner

elif render_config is not None: # TestBehavior.AFTER_ALL
task_args["select"] = render_config.select
task_args["selector"] = render_config.selector
Expand Down Expand Up @@ -285,34 +282,45 @@ def create_task_metadata(
"""
dbt_resource_to_class = create_dbt_resource_to_class(test_behavior)

args = {**args, **{"models": node.resource_name}}
# Make a copy to avoid issues with mutable arguments
args = {**args}

if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class:
extra_context: dict[str, Any] = {
"dbt_node_config": node.context_dict,
"dbt_dag_task_group_identifier": dbt_dag_task_group_identifier,
"package_name": node.package_name,
}
resource_suffix_map = {TestBehavior.BUILD: "build", DbtResourceType.MODEL: "run"}
resource_suffix = (
resource_suffix_map.get(test_behavior)
or resource_suffix_map.get(node.resource_type)
or node.resource_type.value
)
# Since Cosmos 1.11, it selects models using --select, instead of --models. The reason for this is that
# this flag was deprecated in dbt-core 1.10 (https://github.com/dbt-labs/dbt-core/issues/11561)
# and dbt fusion (2.0.0-beta26) does not support it.
# Users can still force Cosmos to use `--models` by setting the environment variable
# `AIRFLOW__COSMOS__PRE_DBT_FUSION=1`.
models_select_key = "models" if settings.pre_dbt_fusion else "select"

if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES:
args[models_select_key] = f"{node.resource_name}"
if test_indirect_selection != TestIndirectSelection.EAGER:
args["indirect_selection"] = test_indirect_selection.value
args["on_warning_callback"] = on_warning_callback
exclude_detached_tests_if_needed(node, args, detached_from_parent)
task_id, args = _get_task_id_and_args(
node,
args,
use_task_group,
normalize_task_id,
normalize_task_display_name,
"build",
node=node,
args=args,
use_task_group=use_task_group,
normalize_task_id=normalize_task_id,
normalize_task_display_name=normalize_task_display_name,
resource_suffix=resource_suffix,
include_resource_type=True,
)
elif node.resource_type == DbtResourceType.MODEL:
task_id, args = _get_task_id_and_args(
node, args, use_task_group, normalize_task_id, normalize_task_display_name, "run"
)
elif node.resource_type == DbtResourceType.SOURCE:
args["select"] = f"source:{node.resource_name}"
args["on_warning_callback"] = on_warning_callback

if (source_rendering_behavior == SourceRenderingBehavior.NONE) or (
Expand All @@ -321,8 +329,7 @@ def create_task_metadata(
and node.has_test is False
):
return None
args["select"] = f"source:{node.resource_name}"
args.pop("models")

task_id, args = _get_task_id_and_args(
node, args, use_task_group, normalize_task_id, normalize_task_display_name, "source"
)
Expand All @@ -334,9 +341,15 @@ def create_task_metadata(
else:
args = {}
return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator", arguments=args)
else:
else: # DbtResourceType.MODEL, DbtResourceType.SEED and DbtResourceType.SNAPSHOT
args[models_select_key] = node.resource_name
task_id, args = _get_task_id_and_args(
node, args, use_task_group, normalize_task_id, normalize_task_display_name, node.resource_type.value
node=node,
args=args,
use_task_group=use_task_group,
normalize_task_id=normalize_task_id,
normalize_task_display_name=normalize_task_display_name,
resource_suffix=resource_suffix,
)

_override_profile_if_needed(args, node.profile_config_to_override)
Expand Down
21 changes: 18 additions & 3 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
package_name=node_dict.get("package_name"),
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=base_path / node_dict["original_file_path"],
# dbt-core defined the node path via "original_file_path", dbt fusion identifies it via "path"
file_path=base_path / (node_dict["original_file_path"] or node_dict.get("path")),
Comment thread
tatiana marked this conversation as resolved.
tags=node_dict.get("tags", []),
config=node_dict.get("config", {}),
has_freshness=(
Expand Down Expand Up @@ -565,7 +566,16 @@ def run_dbt_ls(
self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]
) -> dict[str, DbtNode]:
"""Runs dbt ls command and returns the parsed nodes."""
if self.render_config.source_rendering_behavior != SourceRenderingBehavior.NONE:

# dbt fusion 2.0.0b26 `dbt ls --output json` returns, by default, less keys than dbt-core 1.10.
# Default keys returned by dbt-core: ['name', 'resource_type', 'package_name', 'original_file_path', 'unique_id', 'alias', 'config', 'tags', 'depends_on']
# Default keys returned by dbt fusion: ['name', 'package_name', 'path', 'resource_type', 'unique_id']
# Users can force previous Cosmos behaviour by setting pre_dbt_fusion to True.
specify_output_keys = (
not settings.pre_dbt_fusion or self.render_config.source_rendering_behavior != SourceRenderingBehavior.NONE
)

if specify_output_keys:
ls_command = [
dbt_cmd,
"ls",
Expand All @@ -582,7 +592,12 @@ def run_dbt_ls(
"freshness",
]
else:
ls_command = [dbt_cmd, "ls", "--output", "json"]
ls_command = [
dbt_cmd,
"ls",
"--output",
"json",
]

ls_args = self.dbt_ls_args
ls_command.extend(self.local_flags)
Expand Down
1 change: 1 addition & 0 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile")
virtualenv_max_retries_lock = conf.getint("cosmos", "virtualenv_max_retries_lock", fallback=120)
default_copy_dbt_packages = conf.getboolean("cosmos", "default_copy_dbt_packages", fallback=False)
pre_dbt_fusion = conf.getboolean("cosmos", "pre_dbt_fusion", fallback=False)

# Experimentally adding `remote_cache_dir` as a separate entity in the Cosmos 1.6 release to gather feedback.
# This will be merged with the `cache_dir` config parameter in upcoming releases.
Expand Down
12 changes: 12 additions & 0 deletions dev/dags/dbt/jaffle_shop/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ default:
schema: "{{ env_var('POSTGRES_SCHEMA') }}"
threads: 4

snowflake_profile:
target: dev
outputs:
dev:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
schema: "{{ env_var('SNOWFLAKE_SCHEMA') }}"
warehouse: "{{ env_var('SNOWFLAKE_WAREHOUSE') }}"
database: "{{ env_var('SNOWFLAKE_DATABASE') }}"

postgres_profile:
target: dev
outputs:
Expand Down
9 changes: 6 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow}
[[tool.hatch.envs.tests.matrix]]
python = ["3.8", "3.9", "3.10", "3.11", "3.12"]
airflow = ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "3.0"]
dbt = ["1.5", "1.6", "1.7", "1.8", "1.9"]
dbt = ["1.5", "1.6", "1.7", "1.8", "1.9", "2.0"]

[tool.hatch.envs.tests.overrides]
matrix.airflow.dependencies = [
Expand All @@ -180,13 +180,16 @@ matrix.airflow.dependencies = [
freeze = "pip freeze"
test = 'sh scripts/test/unit.sh'
test-cov = 'sh scripts/test/unit-cov.sh'
test-integration-setup = 'sh scripts/test/integration-setup.sh {matrix:dbt}'
test-integration = 'sh scripts/test/integration.sh'
test-kubernetes = "sh scripts/test/integration-kubernetes.sh"
test-kubernetes-setup = "sh scripts/test/kubernetes-setup.sh {matrix:dbt}"
test-integration-dbtf-setup = 'sh scripts/test/integration-dbtf-setup.sh'
test-integration-dbtf = 'sh scripts/test/integration-dbtf.sh'
test-integration-dbt-1-5-4 = 'sh scripts/test/integration-dbt-1-5-4.sh'
test-integration-dbt-async = 'sh scripts/test/integration-dbt-async.sh {matrix:dbt}'
test-integration-expensive = 'sh scripts/test/integration-expensive.sh'
test-integration-setup = 'sh scripts/test/integration-setup.sh {matrix:dbt}'

test-performance = 'sh scripts/test/performance.sh'
test-performance-setup = 'sh scripts/test/performance-setup.sh {matrix:dbt}'
type-check = "pre-commit run mypy --files cosmos/**/*"
Expand All @@ -195,7 +198,7 @@ type-check = "pre-commit run mypy --files cosmos/**/*"
addopts = "--ignore-glob=**/dbt_packages/*"
filterwarnings = ["ignore::DeprecationWarning"]
minversion = "6.0"
markers = ["integration", "perf"]
markers = ["integration", "perf", "dbtfusion"]

######################################
# DOCS
Expand Down
2 changes: 1 addition & 1 deletion scripts/test/integration-dbt-1-5-4.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pytest -vv \
--cov=cosmos \
--cov-report=term-missing \
--cov-report=xml \
-m integration \
-m 'integration and not dbtFusion' \
--ignore=tests/perf \
--ignore=tests/test_example_k8s_dags.py \
-k 'basic_cosmos_task_group'
28 changes: 28 additions & 0 deletions scripts/test/integration-dbtf-setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash

set -v
set -x
set -e

DBT_VERSION="$1"
NEXT_MINOR_VERSION=$(echo "$DBT_VERSION" | awk -F. '{print $1"."$2+1}')

# we install using the following workaround to overcome installation conflicts, such as:
# apache-airflow 2.3.0 and dbt-core [0.13.0 - 1.5.2] and jinja2>=3.0.0 because these package versions have conflicting dependencies
pip uninstall -y 'dbt-bigquery' 'dbt-databricks' 'dbt-duckdb' 'dbt-postgres' 'dbt-vertica' 'dbt-core'
rm -rf airflow.*
pip freeze | grep airflow
airflow db reset -y

AIRFLOW_VERSION=$(airflow version)
AIRFLOW_MAJOR_VERSION=$(echo "$AIRFLOW_VERSION" | cut -d. -f1)
if [ "$AIRFLOW_MAJOR_VERSION" -ge 3 ]; then
uv pip install cadwyn!=5.4.0
echo "Detected Airflow $AIRFLOW_VERSION. Running 'airflow db migrate'..."
airflow db migrate
else
echo "Detected Airflow $AIRFLOW_VERSION. Running 'airflow db init'..."
airflow db init
fi

uv pip freeze
26 changes: 26 additions & 0 deletions scripts/test/integration-dbtf.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash

set -x
set -e
set -v

export SOURCE_RENDERING_BEHAVIOR=all
pip freeze | grep airflow
echo $AIRFLOW_HOME
ls $AIRFLOW_HOME
airflow db check
rm -rf dbt/jaffle_shop/dbt_packages;


# Note: the dbt Fusion Engine is in Beta! Bugs and missing functionality compared to dbt Core will be resolved
# continuously in the lead-up to a final release (see more details in https://github.com/dbt-labs/dbt-fusion)

# Install dbt fusion (2.0.0-beta.26 on 23 June 2025)
curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update

pytest -vv \
tests/test_dbtf.py \
--cov=cosmos \
--cov-report=term-missing \
--cov-report=xml \
--durations=0
2 changes: 1 addition & 1 deletion scripts/test/integration-expensive.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pytest -vv \
--cov=cosmos \
--cov-report=term-missing \
--cov-report=xml \
-m integration \
-m 'integration and not dbtfusion' \
--ignore=tests/perf \
--ignore=tests/test_example_k8s_dags.py \
-k 'example_cosmos_python_models or example_virtualenv'
2 changes: 1 addition & 1 deletion scripts/test/integration-kubernetes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ pytest -vv \
--cov=cosmos \
--cov-report=term-missing \
--cov-report=xml \
-m integration \
-m 'integration and not dbtfusion' \
tests/test_example_k8s_dags.py
Loading