From 38fba4a5aa184e007111013a59a86232f0cd3e10 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 17 Feb 2026 12:01:08 +0000 Subject: [PATCH 1/2] Add integration tests for dbt Fusion and watcher Closes: https://github.com/astronomer/astronomer-cosmos/issues/2055 --- tests/test_dbtf.py | 80 ++++++++++++++++------------------------------ 1 file changed, 27 insertions(+), 53 deletions(-) diff --git a/tests/test_dbtf.py b/tests/test_dbtf.py index 1c3d24dc1f..e9752c8d06 100644 --- a/tests/test_dbtf.py +++ b/tests/test_dbtf.py @@ -5,7 +5,7 @@ from airflow.utils.state import DagRunState from cosmos import DbtDag, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import InvocationMode +from cosmos.constants import ExecutionMode, InvocationMode from cosmos.profiles import GoogleCloudServiceAccountDictProfileMapping DBT_FUSION_BINARY = Path.home() / ".local/bin/dbt" @@ -34,56 +34,28 @@ render_config = RenderConfig(dbt_executable_path=DBT_FUSION_BINARY, invocation_mode=InvocationMode.SUBPROCESS) -execution_config = ExecutionConfig(dbt_executable_path=DBT_FUSION_BINARY, invocation_mode=InvocationMode.SUBPROCESS) - - -@pytest.mark.integration -@pytest.mark.dbtfusion -def test_dbt_snowflake_dag_with_dbt_fusion(): - """ - Run a DbtDag using dbt Fusion. - Confirm it succeeds and has the expected amount of both: - - dbt resources - - Airflow tasks - And that the tasks are in the expected topological order. - """ - snowflake_dag = DbtDag( - execution_config=execution_config, - project_config=project_config, - profile_config=snowflake_profile_config, - render_config=render_config, - start_date=datetime(2023, 1, 1), - dag_id="snowflake_dbt_fusion_dag", - tags=["profiles"], - ) - outcome = snowflake_dag.test() - assert outcome.state == DagRunState.SUCCESS - - assert len(snowflake_dag.dbt_graph.filtered_nodes) == 23 +local_execution_config = ExecutionConfig( + dbt_executable_path=DBT_FUSION_BINARY, invocation_mode=InvocationMode.SUBPROCESS +) - assert len(snowflake_dag.task_dict) == 13 - tasks_names = [task.task_id for task in snowflake_dag.topological_sort()] - expected_task_names = [ - "raw_customers_seed", - "raw_orders_seed", - "raw_payments_seed", - "stg_customers.run", - "stg_customers.test", - "stg_orders.run", - "stg_orders.test", - "stg_payments.run", - "stg_payments.test", - "customers.run", - "customers.test", - "orders.run", - "orders.test", - ] - assert tasks_names == expected_task_names +watcher_execution_config = ExecutionConfig( + execution_mode=ExecutionMode.WATCHER, + dbt_executable_path=DBT_FUSION_BINARY, + invocation_mode=InvocationMode.SUBPROCESS, +) +@pytest.mark.parametrize( + "dag_id,execution_config,profile_config", + [ + ("dbt_fusion_local_snowflake_dag", local_execution_config, snowflake_profile_config), + ("dbt_fusion_local_bigquery_dag", local_execution_config, bigquery_profile_config), + ("dbt_fusion_watcher_bigquery_dag", watcher_execution_config, bigquery_profile_config), + ], +) @pytest.mark.integration @pytest.mark.dbtfusion -def test_dbt_bigquery_dag_with_dbt_fusion(): +def test_dbt_fusion(dag_id, execution_config, profile_config): """ Run a DbtDag using dbt Fusion. Confirm it succeeds and has the expected amount of both: @@ -91,22 +63,21 @@ def test_dbt_bigquery_dag_with_dbt_fusion(): - Airflow tasks And that the tasks are in the expected topological order. """ - bigquery_dag = DbtDag( + dbt_fusion_dag = DbtDag( execution_config=execution_config, project_config=project_config, - profile_config=bigquery_profile_config, + profile_config=profile_config, render_config=render_config, start_date=datetime(2023, 1, 1), - dag_id="bigquery_dbt_fusion_dag", + dag_id=dag_id, tags=["profiles"], ) - outcome = bigquery_dag.test() + outcome = dbt_fusion_dag.test() assert outcome.state == DagRunState.SUCCESS - assert len(bigquery_dag.dbt_graph.filtered_nodes) == 23 + assert len(dbt_fusion_dag.dbt_graph.filtered_nodes) == 23 - assert len(bigquery_dag.task_dict) == 13 - tasks_names = [task.task_id for task in bigquery_dag.topological_sort()] + tasks_names = [task.task_id for task in dbt_fusion_dag.topological_sort()] expected_task_names = [ "raw_customers_seed", "raw_orders_seed", @@ -122,4 +93,7 @@ def test_dbt_bigquery_dag_with_dbt_fusion(): "orders.run", "orders.test", ] + if execution_config.execution_mode == ExecutionMode.WATCHER: + expected_task_names.insert(0, "dbt_producer_watcher") + assert tasks_names == expected_task_names From fe19efaff42bc6a170e951f448318d0c00edbf3c Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 17 Feb 2026 18:08:24 +0000 Subject: [PATCH 2/2] Attempt to solve watcher test that is hanging --- tests/test_dbtf.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/test_dbtf.py b/tests/test_dbtf.py index e9752c8d06..d9ccdd936b 100644 --- a/tests/test_dbtf.py +++ b/tests/test_dbtf.py @@ -1,3 +1,4 @@ +import os from datetime import datetime from pathlib import Path @@ -63,6 +64,11 @@ def test_dbt_fusion(dag_id, execution_config, profile_config): - Airflow tasks And that the tasks are in the expected topological order. """ + if os.getenv("CI"): + operator_args = {"trigger_rule": "all_success"} + else: + operator_args = {} + dbt_fusion_dag = DbtDag( execution_config=execution_config, project_config=project_config, @@ -71,6 +77,7 @@ def test_dbt_fusion(dag_id, execution_config, profile_config): start_date=datetime(2023, 1, 1), dag_id=dag_id, tags=["profiles"], + operator_args=operator_args, ) outcome = dbt_fusion_dag.test() assert outcome.state == DagRunState.SUCCESS