From 1f1d4cc7de24e2b3cbe9079ffe2e34b59e671006 Mon Sep 17 00:00:00 2001 From: Michal Mrazek Date: Tue, 2 Dec 2025 21:02:11 +0100 Subject: [PATCH 1/5] test: add Airflow 3 DAG versioning tests for Cosmos --- tests/test_airflow_versioning.py | 198 +++++++++++++++++++++++++++++++ 1 file changed, 198 insertions(+) create mode 100644 tests/test_airflow_versioning.py diff --git a/tests/test_airflow_versioning.py b/tests/test_airflow_versioning.py new file mode 100644 index 0000000000..a7fdc25a24 --- /dev/null +++ b/tests/test_airflow_versioning.py @@ -0,0 +1,198 @@ +import shutil +from pathlib import Path + +import pytest + +from cosmos.constants import AIRFLOW_VERSION + +if AIRFLOW_VERSION.major < 3: + pytest.skip("Skipping Airflow versioning tests on Airflow 2.x", allow_module_level=True) +else: + # Only import these when Airflow 3.x is present + from airflow.models import DagBag, DagRun, TaskInstance + from airflow.models.dag import DagModel + from airflow.models.dag_version import DagVersion + from airflow.models.dagbundle import DagBundleModel + from airflow.models.dagcode import DagCode + from airflow.models.serialized_dag import SerializedDagModel + from airflow.sdk import timezone + from airflow.serialization.serialized_objects import LazyDeserializedDAG + from airflow.utils.session import provide_session + from airflow.utils.state import DagRunState, TaskInstanceState + + +DBT_PROJECT_PATH = Path(__file__).parent.parent / "dev" / "dags" / "dbt" / "jaffle_shop" + + +@pytest.fixture +def jaffle_shop_test_dir(tmp_path): + """Create a test directory with the local jaffle shop project and DAG file.""" + test_path = tmp_path / "jaffle_shop" + test_path.mkdir() + shutil.copytree(DBT_PROJECT_PATH, test_path, dirs_exist_ok=True) + + dag_file = test_path / "basic_cosmos_dag.py" + basic_dag_path = Path(__file__).parent.parent / "dev" / "dags" / "basic_cosmos_dag.py" + dag_content = basic_dag_path.read_text().replace( + "DBT_PROJECT_PATH = DBT_ROOT_PATH / DBT_PROJECT_NAME", f'DBT_PROJECT_PATH = Path("{test_path}")' + ) + dag_file.write_text(dag_content) + + return test_path + + +@pytest.fixture +def test_dag_id(): + """DAG ID used in versioning tests.""" + return "basic_cosmos_dag" + + +@pytest.fixture +def dag_version_cleaner(test_dag_id): + """Fixture to clean up DAG versioning data before and after tests.""" + + @provide_session + def _cleanup(dag_id, session=None): + """Clean up all Airflow 3 versioning-related data for a DAG.""" + try: + for table in [TaskInstance, DagRun, SerializedDagModel, DagVersion, DagCode, DagModel]: + session.query(table).filter(table.dag_id == dag_id).delete() + session.commit() + except Exception: + session.rollback() + raise + + _cleanup(test_dag_id) + yield + _cleanup(test_dag_id) + + +@pytest.fixture +def serialize_dag(): + """Factory to serialize DAG.""" + + @provide_session + def _serialize(dag, bundle_name="test_bundle", bundle_version="1.0.0", session=None): + """Serialize DAG.""" + # Ensure bundle exists + if not session.get(DagBundleModel, bundle_name): + session.add(DagBundleModel(name=bundle_name)) + session.flush() + + # Ensure DagModel exists + if not session.get(DagModel, dag.dag_id): + session.add(DagModel(dag_id=dag.dag_id, bundle_name=bundle_name)) + session.flush() + + # Serialize DAG (uses scheduler's hash-based versioning) + return SerializedDagModel.write_dag( + dag=LazyDeserializedDAG.from_dag(dag), + bundle_name=bundle_name, + bundle_version=bundle_version, + session=session, + ) + + return _serialize + + +@pytest.fixture +def task_instance_creator(): + """Factory to create task instances.""" + + @provide_session + def _create(dag, dag_version, session=None): + """Create a DagRun and Success TaskInstance.""" + logical_date = timezone.utcnow() + dag_run = DagRun( + dag_id=dag.dag_id, + run_id=f"test_{logical_date.isoformat()}", + logical_date=logical_date, + state=DagRunState.SUCCESS, + run_type="manual", + ) + dag_run.dag_version = dag_version + session.add(dag_run) + session.flush() + + task = list(dag.tasks)[0] + task_instance = TaskInstance( + task=task, + run_id=dag_run.run_id, + dag_version_id=dag_version.id, + state=TaskInstanceState.SUCCESS, + ) + session.add(task_instance) + session.commit() + return task_instance + + return _create + + +@pytest.mark.integration +def test_cosmos_dag_version_tracking_with_added_model( + jaffle_shop_test_dir, test_dag_id, dag_version_cleaner, serialize_dag, task_instance_creator +): + """Test that DAG versions increment when dbt models are added. + + This test verifies that Airflow 3's DAG versioning system correctly tracks + structural changes to Cosmos DAGs when dbt models are added. + """ + # Parse DAG and serialize + dagbag = DagBag(dag_folder=jaffle_shop_test_dir) + dag_v1 = dagbag.dags[test_dag_id] + + serialize_dag(dag_v1) + version_1 = DagVersion.get_latest_version(test_dag_id) + + task_instance_creator(dag_v1, version_1) + + # Add new dbt model (cosmos dag should change) + (jaffle_shop_test_dir / "models" / "new_model.sql").write_text("select 1 as id;") + + # Re-parse DAG and serialize + dagbag = DagBag(dag_folder=jaffle_shop_test_dir) + dag_v2 = dagbag.dags[test_dag_id] + + serialize_dag(dag_v2) + version_2 = DagVersion.get_latest_version(test_dag_id) + + # Verify version incremented + assert version_1.version_number == 1 + assert version_2.version_number == 2 + assert version_1.id != version_2.id + + # Verify structural change (new task added) + assert len(dag_v2.tasks) == len(dag_v1.tasks) + 1 + + +@pytest.mark.integration +def test_cosmos_dag_version_unchanged_without_modifications( + jaffle_shop_test_dir, test_dag_id, dag_version_cleaner, serialize_dag, task_instance_creator +): + """Test that DAG version does not change when DAG structure is unchanged. + + This test verifies that Airflow 3's DAG versioning system recognizes + when there are no structural changes to the DAG. + """ + # Parse DAG and serialize + dagbag = DagBag(dag_folder=jaffle_shop_test_dir) + dag_v1 = dagbag.dags[test_dag_id] + + serialize_dag(dag_v1) + version_1 = DagVersion.get_latest_version(test_dag_id) + + task_instance_creator(dag_v1, version_1) + + # Re-parse DAG and serialize + dagbag = DagBag(dag_folder=jaffle_shop_test_dir) + dag_v2 = dagbag.dags[test_dag_id] + + serialize_dag(dag_v2) + version_2 = DagVersion.get_latest_version(test_dag_id) + + # Verify version did not increment (same version reused) + assert version_1.id == version_2.id + assert version_1.version_number == version_2.version_number + + # Verify same number of tasks + assert len(dag_v2.tasks) == len(dag_v1.tasks) From 15e23c7410e141d9acd6f2430dc5fdb227a860ed Mon Sep 17 00:00:00 2001 From: Michal Mrazek Date: Tue, 2 Dec 2025 21:23:19 +0100 Subject: [PATCH 2/5] improv db handling --- tests/test_airflow_versioning.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/tests/test_airflow_versioning.py b/tests/test_airflow_versioning.py index a7fdc25a24..9764d235cc 100644 --- a/tests/test_airflow_versioning.py +++ b/tests/test_airflow_versioning.py @@ -28,7 +28,6 @@ def jaffle_shop_test_dir(tmp_path): """Create a test directory with the local jaffle shop project and DAG file.""" test_path = tmp_path / "jaffle_shop" - test_path.mkdir() shutil.copytree(DBT_PROJECT_PATH, test_path, dirs_exist_ok=True) dag_file = test_path / "basic_cosmos_dag.py" @@ -56,7 +55,7 @@ def _cleanup(dag_id, session=None): """Clean up all Airflow 3 versioning-related data for a DAG.""" try: for table in [TaskInstance, DagRun, SerializedDagModel, DagVersion, DagCode, DagModel]: - session.query(table).filter(table.dag_id == dag_id).delete() + session.query(table).filter(table.dag_id == dag_id).delete(synchronize_session="fetch") session.commit() except Exception: session.rollback() @@ -74,15 +73,10 @@ def serialize_dag(): @provide_session def _serialize(dag, bundle_name="test_bundle", bundle_version="1.0.0", session=None): """Serialize DAG.""" - # Ensure bundle exists - if not session.get(DagBundleModel, bundle_name): - session.add(DagBundleModel(name=bundle_name)) - session.flush() - - # Ensure DagModel exists - if not session.get(DagModel, dag.dag_id): - session.add(DagModel(dag_id=dag.dag_id, bundle_name=bundle_name)) - session.flush() + # Ensure bundle exists atomically + session.merge(DagBundleModel(name=bundle_name)) + # Ensure DagModel exists atomically + session.merge(DagModel(dag_id=dag.dag_id, bundle_name=bundle_name)) # Serialize DAG (uses scheduler's hash-based versioning) return SerializedDagModel.write_dag( From 8089e8c5bef30cceffc7d41be566d38fd3a89ebe Mon Sep 17 00:00:00 2001 From: Michal Mrazek Date: Sun, 7 Dec 2025 11:59:41 +0100 Subject: [PATCH 3/5] simple project and run the dag --- tests/test_airflow_versioning.py | 93 +++++++++++--------------------- 1 file changed, 32 insertions(+), 61 deletions(-) diff --git a/tests/test_airflow_versioning.py b/tests/test_airflow_versioning.py index 9764d235cc..b67701b4b0 100644 --- a/tests/test_airflow_versioning.py +++ b/tests/test_airflow_versioning.py @@ -4,6 +4,7 @@ import pytest from cosmos.constants import AIRFLOW_VERSION +from tests.utils import run_dag if AIRFLOW_VERSION.major < 3: pytest.skip("Skipping Airflow versioning tests on Airflow 2.x", allow_module_level=True) @@ -15,25 +16,28 @@ from airflow.models.dagbundle import DagBundleModel from airflow.models.dagcode import DagCode from airflow.models.serialized_dag import SerializedDagModel - from airflow.sdk import timezone from airflow.serialization.serialized_objects import LazyDeserializedDAG from airflow.utils.session import provide_session - from airflow.utils.state import DagRunState, TaskInstanceState -DBT_PROJECT_PATH = Path(__file__).parent.parent / "dev" / "dags" / "dbt" / "jaffle_shop" +DBT_PROJECT_PATH = Path(__file__).parent.parent / "dev" / "dags" / "dbt" / "simple" @pytest.fixture -def jaffle_shop_test_dir(tmp_path): - """Create a test directory with the local jaffle shop project and DAG file.""" - test_path = tmp_path / "jaffle_shop" +def dbt_project_test_dir(tmp_path): + """Create a test directory with the local simple dbt project and DAG file.""" + test_path = tmp_path / "simple" shutil.copytree(DBT_PROJECT_PATH, test_path, dirs_exist_ok=True) dag_file = test_path / "basic_cosmos_dag.py" basic_dag_path = Path(__file__).parent.parent / "dev" / "dags" / "basic_cosmos_dag.py" - dag_content = basic_dag_path.read_text().replace( - "DBT_PROJECT_PATH = DBT_ROOT_PATH / DBT_PROJECT_NAME", f'DBT_PROJECT_PATH = Path("{test_path}")' + dag_content = ( + basic_dag_path.read_text() + .replace("DBT_PROJECT_PATH = DBT_ROOT_PATH / DBT_PROJECT_NAME", f'DBT_PROJECT_PATH = Path("{test_path}")') + .replace( + 'DBT_PROJECT_NAME = os.getenv("DBT_PROJECT_NAME", "jaffle_shop")', + 'DBT_PROJECT_NAME = "simple"', + ) ) dag_file.write_text(dag_content) @@ -89,71 +93,36 @@ def _serialize(dag, bundle_name="test_bundle", bundle_version="1.0.0", session=N return _serialize -@pytest.fixture -def task_instance_creator(): - """Factory to create task instances.""" - - @provide_session - def _create(dag, dag_version, session=None): - """Create a DagRun and Success TaskInstance.""" - logical_date = timezone.utcnow() - dag_run = DagRun( - dag_id=dag.dag_id, - run_id=f"test_{logical_date.isoformat()}", - logical_date=logical_date, - state=DagRunState.SUCCESS, - run_type="manual", - ) - dag_run.dag_version = dag_version - session.add(dag_run) - session.flush() - - task = list(dag.tasks)[0] - task_instance = TaskInstance( - task=task, - run_id=dag_run.run_id, - dag_version_id=dag_version.id, - state=TaskInstanceState.SUCCESS, - ) - session.add(task_instance) - session.commit() - return task_instance - - return _create - - @pytest.mark.integration def test_cosmos_dag_version_tracking_with_added_model( - jaffle_shop_test_dir, test_dag_id, dag_version_cleaner, serialize_dag, task_instance_creator + dbt_project_test_dir, test_dag_id, dag_version_cleaner, serialize_dag ): """Test that DAG versions increment when dbt models are added. This test verifies that Airflow 3's DAG versioning system correctly tracks structural changes to Cosmos DAGs when dbt models are added. """ - # Parse DAG and serialize - dagbag = DagBag(dag_folder=jaffle_shop_test_dir) + # Parse DAG and run + dagbag = DagBag(dag_folder=dbt_project_test_dir) dag_v1 = dagbag.dags[test_dag_id] + run_dag(dag_v1) - serialize_dag(dag_v1) version_1 = DagVersion.get_latest_version(test_dag_id) - task_instance_creator(dag_v1, version_1) - # Add new dbt model (cosmos dag should change) - (jaffle_shop_test_dir / "models" / "new_model.sql").write_text("select 1 as id;") + (dbt_project_test_dir / "models" / "dummy_model.sql").write_text("SELECT 2 AS id, 'example' AS name") - # Re-parse DAG and serialize - dagbag = DagBag(dag_folder=jaffle_shop_test_dir) + # Re-parse DAG, serialize and run + dagbag = DagBag(dag_folder=dbt_project_test_dir) dag_v2 = dagbag.dags[test_dag_id] - serialize_dag(dag_v2) + run_dag(dag_v2) + version_2 = DagVersion.get_latest_version(test_dag_id) # Verify version incremented assert version_1.version_number == 1 assert version_2.version_number == 2 - assert version_1.id != version_2.id # Verify structural change (new task added) assert len(dag_v2.tasks) == len(dag_v1.tasks) + 1 @@ -161,27 +130,29 @@ def test_cosmos_dag_version_tracking_with_added_model( @pytest.mark.integration def test_cosmos_dag_version_unchanged_without_modifications( - jaffle_shop_test_dir, test_dag_id, dag_version_cleaner, serialize_dag, task_instance_creator + dbt_project_test_dir, + test_dag_id, + dag_version_cleaner, + serialize_dag, ): """Test that DAG version does not change when DAG structure is unchanged. This test verifies that Airflow 3's DAG versioning system recognizes when there are no structural changes to the DAG. """ - # Parse DAG and serialize - dagbag = DagBag(dag_folder=jaffle_shop_test_dir) + # Parse DAG and run + dagbag = DagBag(dag_folder=dbt_project_test_dir) dag_v1 = dagbag.dags[test_dag_id] + run_dag(dag_v1) - serialize_dag(dag_v1) version_1 = DagVersion.get_latest_version(test_dag_id) - task_instance_creator(dag_v1, version_1) - - # Re-parse DAG and serialize - dagbag = DagBag(dag_folder=jaffle_shop_test_dir) + # Re-parse DAG, serialize and run + dagbag = DagBag(dag_folder=dbt_project_test_dir) dag_v2 = dagbag.dags[test_dag_id] - serialize_dag(dag_v2) + run_dag(dag_v2) + version_2 = DagVersion.get_latest_version(test_dag_id) # Verify version did not increment (same version reused) From 502be07def2ea46669779555ae55daf185785d04 Mon Sep 17 00:00:00 2001 From: Michal Mrazek Date: Mon, 26 Jan 2026 20:14:55 +0100 Subject: [PATCH 4/5] Add Airflow 3.1+ DAG versioning integration tests --- tests/test_airflow_versioning.py | 52 ++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/tests/test_airflow_versioning.py b/tests/test_airflow_versioning.py index b67701b4b0..0bbdeac4b2 100644 --- a/tests/test_airflow_versioning.py +++ b/tests/test_airflow_versioning.py @@ -2,14 +2,14 @@ from pathlib import Path import pytest +from packaging.version import Version from cosmos.constants import AIRFLOW_VERSION from tests.utils import run_dag -if AIRFLOW_VERSION.major < 3: - pytest.skip("Skipping Airflow versioning tests on Airflow 2.x", allow_module_level=True) +if AIRFLOW_VERSION < Version("3.1"): + pytest.skip("Skipping Airflow versioning tests on Airflow 2.x and 3.0", allow_module_level=True) else: - # Only import these when Airflow 3.x is present from airflow.models import DagBag, DagRun, TaskInstance from airflow.models.dag import DagModel from airflow.models.dag_version import DagVersion @@ -161,3 +161,49 @@ def test_cosmos_dag_version_unchanged_without_modifications( # Verify same number of tasks assert len(dag_v2.tasks) == len(dag_v1.tasks) + + +@pytest.mark.integration +def test_cosmos_dag_version_unchanged_with_non_structural_changes( + dbt_project_test_dir, + test_dag_id, + dag_version_cleaner, + serialize_dag, +): + """Test that DAG version does not change when non-structural files change. + + This test verifies that Airflow 3's DAG versioning system correctly identifies + that changes to non-structural files (like README or adding unrelated files) + should not create a new DAG version. The DAG structure (tasks, dependencies) + remains the same. + """ + # Parse DAG and run + dagbag = DagBag(dag_folder=dbt_project_test_dir) + dag_v1 = dagbag.dags[test_dag_id] + run_dag(dag_v1) + + version_1 = DagVersion.get_latest_version(test_dag_id) + + # Add a README file to the project (should NOT affect DAG structure) + readme_file = dbt_project_test_dir / "README.md" + readme_file.write_text("# My dbt Project\n\nThis is a test project.") + + # Add a random data file (should NOT affect DAG structure) + data_file = dbt_project_test_dir / "data" / "sample.csv" + data_file.parent.mkdir(exist_ok=True) + data_file.write_text("id,name\n1,test\n2,example") + + # Re-parse DAG, serialize and run + dagbag = DagBag(dag_folder=dbt_project_test_dir) + dag_v2 = dagbag.dags[test_dag_id] + serialize_dag(dag_v2) + run_dag(dag_v2) + + version_2 = DagVersion.get_latest_version(test_dag_id) + + # Verify version did NOT increment (non-structural changes don't affect DAG) + assert version_1.id == version_2.id + assert version_1.version_number == version_2.version_number + + # Verify same number of tasks + assert len(dag_v2.tasks) == len(dag_v1.tasks) From 0612f362d428d2ae01e537af8dc58c81ade94339 Mon Sep 17 00:00:00 2001 From: Michal Mrazek Date: Thu, 29 Jan 2026 13:52:13 +0100 Subject: [PATCH 5/5] Fix airflow versioning test --- tests/test_airflow_versioning.py | 46 -------------------------------- 1 file changed, 46 deletions(-) diff --git a/tests/test_airflow_versioning.py b/tests/test_airflow_versioning.py index 0bbdeac4b2..3e7cb62cca 100644 --- a/tests/test_airflow_versioning.py +++ b/tests/test_airflow_versioning.py @@ -161,49 +161,3 @@ def test_cosmos_dag_version_unchanged_without_modifications( # Verify same number of tasks assert len(dag_v2.tasks) == len(dag_v1.tasks) - - -@pytest.mark.integration -def test_cosmos_dag_version_unchanged_with_non_structural_changes( - dbt_project_test_dir, - test_dag_id, - dag_version_cleaner, - serialize_dag, -): - """Test that DAG version does not change when non-structural files change. - - This test verifies that Airflow 3's DAG versioning system correctly identifies - that changes to non-structural files (like README or adding unrelated files) - should not create a new DAG version. The DAG structure (tasks, dependencies) - remains the same. - """ - # Parse DAG and run - dagbag = DagBag(dag_folder=dbt_project_test_dir) - dag_v1 = dagbag.dags[test_dag_id] - run_dag(dag_v1) - - version_1 = DagVersion.get_latest_version(test_dag_id) - - # Add a README file to the project (should NOT affect DAG structure) - readme_file = dbt_project_test_dir / "README.md" - readme_file.write_text("# My dbt Project\n\nThis is a test project.") - - # Add a random data file (should NOT affect DAG structure) - data_file = dbt_project_test_dir / "data" / "sample.csv" - data_file.parent.mkdir(exist_ok=True) - data_file.write_text("id,name\n1,test\n2,example") - - # Re-parse DAG, serialize and run - dagbag = DagBag(dag_folder=dbt_project_test_dir) - dag_v2 = dagbag.dags[test_dag_id] - serialize_dag(dag_v2) - run_dag(dag_v2) - - version_2 = DagVersion.get_latest_version(test_dag_id) - - # Verify version did NOT increment (non-structural changes don't affect DAG) - assert version_1.id == version_2.id - assert version_1.version_number == version_2.version_number - - # Verify same number of tasks - assert len(dag_v2.tasks) == len(dag_v1.tasks)