diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index db00599722..0364a0c3eb 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main] + branches: [main, dbtf] pull_request_target: # Also run on pull requests originated from forks branches: [main] @@ -424,6 +424,65 @@ jobs: AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH + Run-Integration-dbt-fusion-Tests: + needs: Authorize + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.10", "3.11"] + airflow-version: ["2.10", "3.0"] + dbt-version: ["2.0"] # dbt Fusion + exclude: + - python-version: "3.8" + airflow-version: "3.0" + + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha || github.ref }} + + - uses: actions/cache@v4 + with: + path: | + ~/.cache/pip + .local/share/hatch/ + key: integration-dbtf-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}--${{ matrix.dbt-version }}${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }} + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install packages and dependencies + run: | + python -m pip install uv + uv pip install --system hatch + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} run pip freeze + + - name: Test Cosmos against Airflow ${{ matrix.airflow-version }}, Python ${{ matrix.python-version }} and dbt ${{ matrix.dbt-version }} + run: | + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-dbtf-setup + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-dbtf + env: + AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0 + AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ + AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 + PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH + SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }} + SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }} + SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }} + SNOWFLAKE_SCHEMA: ${{ secrets.SNOWFLAKE_SCHEMA }} + SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWFLAKE_WAREHOUSE }} + SNOWFLAKE_DATABASE: ${{ secrets.SNOWFLAKE_DATABASE }} + + - name: Upload coverage to Github + uses: actions/upload-artifact@v4 + with: + name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }} + path: .coverage + include-hidden-files: true + Run-Performance-Tests: needs: Authorize runs-on: ubuntu-latest diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d4c919fda9..5c1302224e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,9 +1,19 @@ Changelog ========= -1.10.1 (2025-05-21) +1.11.0a1 (2025-06-23) --------------------- +Feature + +* Initial support to ``dbt Fusion`` by @tatiana in #1803. `More details here. `_. + +(many other features, pending details) + + +1.10.1 (2025-05-21) +------------------- + Bug Fixes * Fix ``full_refresh`` parameter in ``AIRFLOW_ASYNC`` ``ExecutionConfig`` mode by @tuantran0910 in #1738 diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 48a1cd7aee..a86e2fa98e 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -9,7 +9,7 @@ from cosmos import settings -__version__ = "1.10.2a1" +__version__ = "1.11.0a1" if not settings.enable_memory_optimised_imports: from cosmos.airflow.dag import DbtDag diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 63b4d881ec..739e44d719 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -152,9 +152,7 @@ def create_test_task_metadata( if test_indirect_selection != TestIndirectSelection.EAGER: task_args["indirect_selection"] = test_indirect_selection.value if node is not None: - if node.resource_type == DbtResourceType.MODEL: - task_args["models"] = node.resource_name - elif node.resource_type == DbtResourceType.SOURCE: + if node.resource_type == DbtResourceType.SOURCE: task_args["select"] = f"source:{node.resource_name}" elif is_detached_test(node): task_args["select"] = node.resource_name.split(".")[0] @@ -163,7 +161,6 @@ def create_test_task_metadata( extra_context = {"dbt_node_config": node.context_dict} task_owner = node.owner - elif render_config is not None: # TestBehavior.AFTER_ALL task_args["select"] = render_config.select task_args["selector"] = render_config.selector @@ -285,7 +282,8 @@ def create_task_metadata( """ dbt_resource_to_class = create_dbt_resource_to_class(test_behavior) - args = {**args, **{"models": node.resource_name}} + # Make a copy to avoid issues with mutable arguments + args = {**args} if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class: extra_context: dict[str, Any] = { @@ -293,26 +291,36 @@ def create_task_metadata( "dbt_dag_task_group_identifier": dbt_dag_task_group_identifier, "package_name": node.package_name, } + resource_suffix_map = {TestBehavior.BUILD: "build", DbtResourceType.MODEL: "run"} + resource_suffix = ( + resource_suffix_map.get(test_behavior) + or resource_suffix_map.get(node.resource_type) + or node.resource_type.value + ) + # Since Cosmos 1.11, it selects models using --select, instead of --models. The reason for this is that + # this flag was deprecated in dbt-core 1.10 (https://github.com/dbt-labs/dbt-core/issues/11561) + # and dbt fusion (2.0.0-beta26) does not support it. + # Users can still force Cosmos to use `--models` by setting the environment variable + # `AIRFLOW__COSMOS__PRE_DBT_FUSION=1`. + models_select_key = "models" if settings.pre_dbt_fusion else "select" if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: + args[models_select_key] = f"{node.resource_name}" if test_indirect_selection != TestIndirectSelection.EAGER: args["indirect_selection"] = test_indirect_selection.value args["on_warning_callback"] = on_warning_callback exclude_detached_tests_if_needed(node, args, detached_from_parent) task_id, args = _get_task_id_and_args( - node, - args, - use_task_group, - normalize_task_id, - normalize_task_display_name, - "build", + node=node, + args=args, + use_task_group=use_task_group, + normalize_task_id=normalize_task_id, + normalize_task_display_name=normalize_task_display_name, + resource_suffix=resource_suffix, include_resource_type=True, ) - elif node.resource_type == DbtResourceType.MODEL: - task_id, args = _get_task_id_and_args( - node, args, use_task_group, normalize_task_id, normalize_task_display_name, "run" - ) elif node.resource_type == DbtResourceType.SOURCE: + args["select"] = f"source:{node.resource_name}" args["on_warning_callback"] = on_warning_callback if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( @@ -321,8 +329,7 @@ def create_task_metadata( and node.has_test is False ): return None - args["select"] = f"source:{node.resource_name}" - args.pop("models") + task_id, args = _get_task_id_and_args( node, args, use_task_group, normalize_task_id, normalize_task_display_name, "source" ) @@ -334,9 +341,15 @@ def create_task_metadata( else: args = {} return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator", arguments=args) - else: + else: # DbtResourceType.MODEL, DbtResourceType.SEED and DbtResourceType.SNAPSHOT + args[models_select_key] = node.resource_name task_id, args = _get_task_id_and_args( - node, args, use_task_group, normalize_task_id, normalize_task_display_name, node.resource_type.value + node=node, + args=args, + use_task_group=use_task_group, + normalize_task_id=normalize_task_id, + normalize_task_display_name=normalize_task_display_name, + resource_suffix=resource_suffix, ) _override_profile_if_needed(args, node.profile_config_to_override) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 84aaac71f5..05685e18c8 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -304,7 +304,8 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str, package_name=node_dict.get("package_name"), resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=base_path / node_dict["original_file_path"], + # dbt-core defined the node path via "original_file_path", dbt fusion identifies it via "path" + file_path=base_path / (node_dict["original_file_path"] or node_dict.get("path")), tags=node_dict.get("tags", []), config=node_dict.get("config", {}), has_freshness=( @@ -565,7 +566,16 @@ def run_dbt_ls( self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str] ) -> dict[str, DbtNode]: """Runs dbt ls command and returns the parsed nodes.""" - if self.render_config.source_rendering_behavior != SourceRenderingBehavior.NONE: + + # dbt fusion 2.0.0b26 `dbt ls --output json` returns, by default, less keys than dbt-core 1.10. + # Default keys returned by dbt-core: ['name', 'resource_type', 'package_name', 'original_file_path', 'unique_id', 'alias', 'config', 'tags', 'depends_on'] + # Default keys returned by dbt fusion: ['name', 'package_name', 'path', 'resource_type', 'unique_id'] + # Users can force previous Cosmos behaviour by setting pre_dbt_fusion to True. + specify_output_keys = ( + not settings.pre_dbt_fusion or self.render_config.source_rendering_behavior != SourceRenderingBehavior.NONE + ) + + if specify_output_keys: ls_command = [ dbt_cmd, "ls", @@ -582,7 +592,12 @@ def run_dbt_ls( "freshness", ] else: - ls_command = [dbt_cmd, "ls", "--output", "json"] + ls_command = [ + dbt_cmd, + "ls", + "--output", + "json", + ] ls_args = self.dbt_ls_args ls_command.extend(self.local_flags) diff --git a/cosmos/settings.py b/cosmos/settings.py index d7be150437..7fe014a5ae 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -31,6 +31,7 @@ dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile") virtualenv_max_retries_lock = conf.getint("cosmos", "virtualenv_max_retries_lock", fallback=120) default_copy_dbt_packages = conf.getboolean("cosmos", "default_copy_dbt_packages", fallback=False) +pre_dbt_fusion = conf.getboolean("cosmos", "pre_dbt_fusion", fallback=False) # Experimentally adding `remote_cache_dir` as a separate entity in the Cosmos 1.6 release to gather feedback. # This will be merged with the `cache_dir` config parameter in upcoming releases. diff --git a/dev/dags/dbt/jaffle_shop/profiles.yml b/dev/dags/dbt/jaffle_shop/profiles.yml index de04db0fbc..4915fbd4e8 100644 --- a/dev/dags/dbt/jaffle_shop/profiles.yml +++ b/dev/dags/dbt/jaffle_shop/profiles.yml @@ -11,6 +11,18 @@ default: schema: "{{ env_var('POSTGRES_SCHEMA') }}" threads: 4 +snowflake_profile: + target: dev + outputs: + dev: + type: snowflake + account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}" + user: "{{ env_var('SNOWFLAKE_USER') }}" + password: "{{ env_var('SNOWFLAKE_PASSWORD') }}" + schema: "{{ env_var('SNOWFLAKE_SCHEMA') }}" + warehouse: "{{ env_var('SNOWFLAKE_WAREHOUSE') }}" + database: "{{ env_var('SNOWFLAKE_DATABASE') }}" + postgres_profile: target: dev outputs: diff --git a/pyproject.toml b/pyproject.toml index 736f52924c..0f6f1de63e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -167,7 +167,7 @@ pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} [[tool.hatch.envs.tests.matrix]] python = ["3.8", "3.9", "3.10", "3.11", "3.12"] airflow = ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "3.0"] -dbt = ["1.5", "1.6", "1.7", "1.8", "1.9"] +dbt = ["1.5", "1.6", "1.7", "1.8", "1.9", "2.0"] [tool.hatch.envs.tests.overrides] matrix.airflow.dependencies = [ @@ -180,13 +180,16 @@ matrix.airflow.dependencies = [ freeze = "pip freeze" test = 'sh scripts/test/unit.sh' test-cov = 'sh scripts/test/unit-cov.sh' +test-integration-setup = 'sh scripts/test/integration-setup.sh {matrix:dbt}' test-integration = 'sh scripts/test/integration.sh' test-kubernetes = "sh scripts/test/integration-kubernetes.sh" test-kubernetes-setup = "sh scripts/test/kubernetes-setup.sh {matrix:dbt}" +test-integration-dbtf-setup = 'sh scripts/test/integration-dbtf-setup.sh' +test-integration-dbtf = 'sh scripts/test/integration-dbtf.sh' test-integration-dbt-1-5-4 = 'sh scripts/test/integration-dbt-1-5-4.sh' test-integration-dbt-async = 'sh scripts/test/integration-dbt-async.sh {matrix:dbt}' test-integration-expensive = 'sh scripts/test/integration-expensive.sh' -test-integration-setup = 'sh scripts/test/integration-setup.sh {matrix:dbt}' + test-performance = 'sh scripts/test/performance.sh' test-performance-setup = 'sh scripts/test/performance-setup.sh {matrix:dbt}' type-check = "pre-commit run mypy --files cosmos/**/*" @@ -195,7 +198,7 @@ type-check = "pre-commit run mypy --files cosmos/**/*" addopts = "--ignore-glob=**/dbt_packages/*" filterwarnings = ["ignore::DeprecationWarning"] minversion = "6.0" -markers = ["integration", "perf"] +markers = ["integration", "perf", "dbtfusion"] ###################################### # DOCS diff --git a/scripts/test/integration-dbt-1-5-4.sh b/scripts/test/integration-dbt-1-5-4.sh index 3e3118ec99..c86a80c3ec 100644 --- a/scripts/test/integration-dbt-1-5-4.sh +++ b/scripts/test/integration-dbt-1-5-4.sh @@ -24,7 +24,7 @@ pytest -vv \ --cov=cosmos \ --cov-report=term-missing \ --cov-report=xml \ - -m integration \ + -m 'integration and not dbtFusion' \ --ignore=tests/perf \ --ignore=tests/test_example_k8s_dags.py \ -k 'basic_cosmos_task_group' diff --git a/scripts/test/integration-dbtf-setup.sh b/scripts/test/integration-dbtf-setup.sh new file mode 100755 index 0000000000..20d3be5609 --- /dev/null +++ b/scripts/test/integration-dbtf-setup.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +set -v +set -x +set -e + +DBT_VERSION="$1" +NEXT_MINOR_VERSION=$(echo "$DBT_VERSION" | awk -F. '{print $1"."$2+1}') + +# we install using the following workaround to overcome installation conflicts, such as: +# apache-airflow 2.3.0 and dbt-core [0.13.0 - 1.5.2] and jinja2>=3.0.0 because these package versions have conflicting dependencies +pip uninstall -y 'dbt-bigquery' 'dbt-databricks' 'dbt-duckdb' 'dbt-postgres' 'dbt-vertica' 'dbt-core' +rm -rf airflow.* +pip freeze | grep airflow +airflow db reset -y + +AIRFLOW_VERSION=$(airflow version) +AIRFLOW_MAJOR_VERSION=$(echo "$AIRFLOW_VERSION" | cut -d. -f1) +if [ "$AIRFLOW_MAJOR_VERSION" -ge 3 ]; then + uv pip install cadwyn!=5.4.0 + echo "Detected Airflow $AIRFLOW_VERSION. Running 'airflow db migrate'..." + airflow db migrate +else + echo "Detected Airflow $AIRFLOW_VERSION. Running 'airflow db init'..." + airflow db init +fi + +uv pip freeze diff --git a/scripts/test/integration-dbtf.sh b/scripts/test/integration-dbtf.sh new file mode 100755 index 0000000000..62a2b6c7b0 --- /dev/null +++ b/scripts/test/integration-dbtf.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +set -x +set -e +set -v + +export SOURCE_RENDERING_BEHAVIOR=all +pip freeze | grep airflow +echo $AIRFLOW_HOME +ls $AIRFLOW_HOME +airflow db check +rm -rf dbt/jaffle_shop/dbt_packages; + + +# Note: the dbt Fusion Engine is in Beta! Bugs and missing functionality compared to dbt Core will be resolved +# continuously in the lead-up to a final release (see more details in https://github.com/dbt-labs/dbt-fusion) + +# Install dbt fusion (2.0.0-beta.26 on 23 June 2025) +curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update + +pytest -vv \ + tests/test_dbtf.py \ + --cov=cosmos \ + --cov-report=term-missing \ + --cov-report=xml \ + --durations=0 diff --git a/scripts/test/integration-expensive.sh b/scripts/test/integration-expensive.sh index 07e204983e..67d7a5d61a 100644 --- a/scripts/test/integration-expensive.sh +++ b/scripts/test/integration-expensive.sh @@ -3,7 +3,7 @@ pytest -vv \ --cov=cosmos \ --cov-report=term-missing \ --cov-report=xml \ - -m integration \ + -m 'integration and not dbtfusion' \ --ignore=tests/perf \ --ignore=tests/test_example_k8s_dags.py \ -k 'example_cosmos_python_models or example_virtualenv' diff --git a/scripts/test/integration-kubernetes.sh b/scripts/test/integration-kubernetes.sh index 9514c99b04..b8817cd870 100644 --- a/scripts/test/integration-kubernetes.sh +++ b/scripts/test/integration-kubernetes.sh @@ -11,5 +11,5 @@ pytest -vv \ --cov=cosmos \ --cov-report=term-missing \ --cov-report=xml \ - -m integration \ + -m 'integration and not dbtfusion' \ tests/test_example_k8s_dags.py diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index 60bb57e057..81dc1b8b99 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -18,7 +18,7 @@ pytest -vv \ --cov-report=term-missing \ --cov-report=xml \ --durations=0 \ - -m 'integration' \ + -m 'integration and not dbtfusion' \ --ignore=tests/perf \ --ignore=tests/test_async_example_dag.py \ --ignore=tests/test_example_k8s_dags.py \ diff --git a/scripts/test/unit.sh b/scripts/test/unit.sh index 39069c423a..373c109a73 100644 --- a/scripts/test/unit.sh +++ b/scripts/test/unit.sh @@ -1,6 +1,6 @@ pytest \ -vv \ - -m "not (integration or perf)" \ + -m "not (integration or perf or dbtfusion)" \ --ignore=tests/perf \ --ignore=tests/test_example_dags.py \ --ignore=tests/test_async_example_dag.py \ diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index b053da2c52..c3b21a8f9b 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -421,7 +421,7 @@ def test_create_task_metadata_unsupported(caplog): DbtResourceType.MODEL, "my_model_run", "cosmos.operators.local.DbtRunLocalOperator", - {"models": "my_model"}, + {"select": "my_model"}, { "dbt_dag_task_group_identifier": "", "dbt_node_config": { @@ -463,7 +463,7 @@ def test_create_task_metadata_unsupported(caplog): DbtResourceType.SNAPSHOT, "my_snapshot_snapshot", "cosmos.operators.local.DbtSnapshotLocalOperator", - {"models": "my_snapshot"}, + {"select": "my_snapshot"}, { "dbt_dag_task_group_identifier": "", "dbt_node_config": { @@ -525,7 +525,7 @@ def test_create_task_metadata_model_with_versions(caplog): ) assert metadata.id == "my_model_v1_run" assert metadata.operator_class == "cosmos.operators.local.DbtRunLocalOperator" - assert metadata.arguments == {"models": "my_model.v1"} + assert metadata.arguments == {"select": "my_model.v1"} def test_create_task_metadata_model_use_task_group(caplog): @@ -634,7 +634,7 @@ def test_create_task_metadata_seed(caplog, use_task_group): assert metadata.id == "seed" assert metadata.operator_class == "cosmos.operators.docker.DbtSeedDockerOperator" - assert metadata.arguments == {"models": "my_seed"} + assert metadata.arguments == {"select": "my_seed"} def test_create_task_metadata_snapshot(caplog): @@ -651,7 +651,7 @@ def test_create_task_metadata_snapshot(caplog): ) assert metadata.id == "my_snapshot_snapshot" assert metadata.operator_class == "cosmos.operators.kubernetes.DbtSnapshotKubernetesOperator" - assert metadata.arguments == {"models": "my_snapshot"} + assert metadata.arguments == {"select": "my_snapshot"} def _normalize_task_id(node: DbtNode) -> str: @@ -959,13 +959,13 @@ def test_build_airflow_graph_with_build_and_buildable_indirect_selection(): DbtResourceType.MODEL, f"{DbtResourceType.MODEL.value}.my_folder.node_name", TestIndirectSelection.EAGER, - {"models": "node_name"}, + {"select": "node_name"}, ), ( DbtResourceType.MODEL, f"{DbtResourceType.MODEL.value}.my_folder.node_name.v1", TestIndirectSelection.EAGER, - {"models": "node_name.v1"}, + {"select": "node_name.v1"}, ), ( DbtResourceType.SEED, diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 1bcfa68bc1..a4d12b0167 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1883,9 +1883,9 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" if sys.platform == "darwin": # We faced inconsistent hashing versions depending on the version of MacOS/Linux - the following line aims to address these. - assert hash_dir in ("64934a984040076870accfc177706353", "159b4a3432c3d0ebad32080a55697089") + assert hash_dir in ("c2c47529eaec412281bdb243a479b734", "efabb6a9130840317ded8d2c05caaea4") else: - assert hash_dir == "159b4a3432c3d0ebad32080a55697089" + assert hash_dir == "efabb6a9130840317ded8d2c05caaea4" @pytest.mark.integration @@ -2065,3 +2065,27 @@ def test__normalize_path(): original_value = "seeds\\seed_ifs_util_manual_event_id.csv" expected_value = "seeds/seed_ifs_util_manual_event_id.csv" assert _normalize_path(original_value) == expected_value + + +@pytest.mark.parametrize( + "pre_dbt_fusion_value,source_rendering_behaviour_value,expected_args_count", + [ + (True, SourceRenderingBehavior.NONE, 4), + (False, SourceRenderingBehavior.NONE, 13), + (True, SourceRenderingBehavior.ALL, 13), + (False, SourceRenderingBehavior.ALL, 13), + ], +) +@patch("cosmos.dbt.graph.settings") +@patch("cosmos.dbt.graph.run_command") +def test_run_dbt_ls( + mock_run_command, mock_settings, pre_dbt_fusion_value, source_rendering_behaviour_value, expected_args_count +): + mock_settings.pre_dbt_fusion = pre_dbt_fusion_value + graph = DbtGraph( + project=ProjectConfig(dbt_project_path="/tmp"), + render_config=RenderConfig(source_rendering_behavior=source_rendering_behaviour_value), + ) + graph.local_flags = [] + graph.run_dbt_ls(dbt_cmd="dbt", project_path=Path("/tmp"), tmp_dir=Path("/tmp"), env_vars={}) + assert len(mock_run_command.call_args[0][0]) == expected_args_count diff --git a/tests/test_converter.py b/tests/test_converter.py index 2ee2946aaf..c14c7c2629 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -237,14 +237,20 @@ def test_converter_creates_dag_with_test_with_multiple_parents(): # We exclude the test that depends on combined_model and model_a from their commands args = tasks["model.my_dbt_project.combined_model"].children["combined_model.test"].build_cmd({})[0] - assert args[1:] == ["test", "--exclude", "custom_test_combined_model_combined_model_", "--models", "combined_model"] + assert args[1:] == ["test", "--select", "combined_model", "--exclude", "custom_test_combined_model_combined_model_"] args = tasks["model.my_dbt_project.model_a"].children["model_a.test"].build_cmd({})[0] - assert args[1:] == ["test", "--exclude", "custom_test_combined_model_combined_model_", "--models", "model_a"] + assert args[1:] == [ + "test", + "--select", + "model_a", + "--exclude", + "custom_test_combined_model_combined_model_", + ] # The test for model_b should not be changed, since it is not a parent of this test args = tasks["model.my_dbt_project.model_b"].children["model_b.test"].build_cmd({})[0] - assert args[1:] == ["test", "--models", "model_b"] + assert args[1:] == ["test", "--select", "model_b"] # We should have a task dedicated to run the test with multiple parents args = tasks["test.my_dbt_project.custom_test_combined_model_combined_model_.c6e4587380"].build_cmd({})[0] @@ -286,14 +292,14 @@ def test_converter_creates_dag_with_test_with_multiple_parents_with_should_detac # We exclude the test that depends on combined_model and model_a from their commands args = tasks["model.my_dbt_project.combined_model"].children["combined_model.test"].build_cmd({})[0] - assert args[1:] == ["test", "--models", "combined_model"] + assert args[1:] == ["test", "--select", "combined_model"] args = tasks["model.my_dbt_project.model_a"].children["model_a.test"].build_cmd({})[0] - assert args[1:] == ["test", "--models", "model_a"] + assert args[1:] == ["test", "--select", "model_a"] # The test for model_b should not be changed, since it is not a parent of this test args = tasks["model.my_dbt_project.model_b"].children["model_b.test"].build_cmd({})[0] - assert args[1:] == ["test", "--models", "model_b"] + assert args[1:] == ["test", "--select", "model_b"] @pytest.mark.integration @@ -405,18 +411,18 @@ def test_converter_creates_dag_with_test_with_multiple_parents_and_build(): args = tasks["model.my_dbt_project.combined_model"].build_cmd({})[0] assert args[1:] == [ "build", + "--select", + "combined_model", "--exclude", "custom_test_combined_model_combined_model_", - "--models", - "combined_model", ] args = tasks["model.my_dbt_project.model_a"].build_cmd({})[0] - assert args[1:] == ["build", "--exclude", "custom_test_combined_model_combined_model_", "--models", "model_a"] + assert args[1:] == ["build", "--select", "model_a", "--exclude", "custom_test_combined_model_combined_model_"] # The test for model_b should not be changed, since it is not a parent of this test args = tasks["model.my_dbt_project.model_b"].build_cmd({})[0] - assert args[1:] == ["build", "--models", "model_b"] + assert args[1:] == ["build", "--select", "model_b"] # We should have a task dedicated to run the test with multiple parents args = tasks["test.my_dbt_project.custom_test_combined_model_combined_model_.c6e4587380"].build_cmd({})[0] @@ -923,3 +929,42 @@ def test_converter_contains_tasks_map(mock_load_dbt_graph, execution_mode, opera operator_args=operator_args, ) assert isinstance(converter.tasks_map, dict) + + +sample_model = DbtNode( + unique_id=f"{DbtResourceType.MODEL}.{SAMPLE_DBT_PROJECT.stem}.sample_model", + resource_type=DbtResourceType.MODEL, + depends_on=[], + file_path="", +) +nodes_with_model = {"sample_model": sample_model} + + +@patch("cosmos.airflow.graph.settings.pre_dbt_fusion", True) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes_with_model) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_creates_model_with_pre_dbt_fusion(mock_load_dbt_graph): + """ + This test validates that DbtToAirflowConverter contains and exposes a tasks map instance + """ + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) + execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + + converter = DbtToAirflowConverter( + dag=DAG("sample_dag", start_date=datetime(2024, 1, 1)), + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args={}, + ) + assert isinstance(converter.tasks_map, dict) + assert converter.tasks_map["sample_model"].models == "sample.sample_model" + assert converter.tasks_map["sample_model"].select is None diff --git a/tests/test_dbtf.py b/tests/test_dbtf.py new file mode 100644 index 0000000000..abac30f616 --- /dev/null +++ b/tests/test_dbtf.py @@ -0,0 +1,66 @@ +from datetime import datetime +from pathlib import Path + +import pytest +from airflow.utils.state import DagRunState + +from cosmos import DbtDag, ExecutionConfig, ProfileConfig, ProjectConfig + +DBT_FUSION_BINARY = Path.home() / ".local/bin/dbt" +DBT_PROJECT_PATH = Path(__file__).parent.parent / "dev/dags/dbt/jaffle_shop" +DBT_PROFILES_YAML_FILEPATH = DBT_PROJECT_PATH / "profiles.yml" + +project_config = ProjectConfig( + dbt_project_path=DBT_PROJECT_PATH, +) + +profile_config = ProfileConfig( + profile_name="snowflake_profile", + target_name="dev", + profiles_yml_filepath=DBT_PROFILES_YAML_FILEPATH, +) + +execution_config = ExecutionConfig(dbt_executable_path=DBT_FUSION_BINARY) + + +@pytest.mark.integration +@pytest.mark.dbtfusion +def test_dbt_dag_with_dbt_fusion(): + """ + Run a DbtDag using dbt Fusion. + Confirm it succeeds and has the expected amount of both: + - dbt resources + - Airflow tasks + And that the tasks are in the expected topological order. + """ + snowflake_dag = DbtDag( + execution_config=execution_config, + project_config=project_config, + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="snowflake_dbt_fusion_dag", + tags=["profiles"], + ) + outcome = snowflake_dag.test() + assert outcome.state == DagRunState.SUCCESS + + assert len(snowflake_dag.dbt_graph.filtered_nodes) == 26 + + assert len(snowflake_dag.task_dict) == 13 + tasks_names = [task.task_id for task in snowflake_dag.topological_sort()] + expected_task_names = [ + "raw_customers_seed", + "raw_orders_seed", + "raw_payments_seed", + "stg_customers.run", + "stg_customers.test", + "stg_orders.run", + "stg_orders.test", + "stg_payments.run", + "stg_payments.test", + "customers.run", + "customers.test", + "orders.run", + "orders.test", + ] + assert tasks_names == expected_task_names