diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 18824b4fc7..945aee94ac 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,26 @@ Changelog ========= +1.12.1 (2026-01-14) +---------------------- + +Bug Fixes + +* Fix ``DbtSourceWatcherOperator.template_fields`` to inherit from ``DbtSourceLocalOperator`` instead of ``DbtConsumerWatcherSensor`` by @pankajkoti in #2226 +* Fix TypeError in Watcher mode with subprocess invocation by @pankajkoti in #2227 +* Error when RenderConfig.invocation_mode is incorrectly set by @tatiana in #2267 + +Docs + +* Fix minor documentation typo by @dnskr in #2093 +* Fix default values in documentation by @dnskr in #2092 +* Remove emit event for ExecutionMode.AIRFLOW_ASYNC limitation in docs by @pankajastro in #2214 + +Others + +* Add test to check profile metrics with non-cosmos operator by @pankajastro in #2215 +* Fix CI main branch Airflow 2.6 tests by @tatiana in #2268 + 1.12.0 (2025-12-18) ---------------------- diff --git a/cosmos/__init__.py b/cosmos/__init__.py index d113ba240b..bbfc2b5ff6 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -9,7 +9,7 @@ from cosmos import settings -__version__ = "1.12.0" +__version__ = "1.12.1" if not settings.enable_memory_optimised_imports: from cosmos.airflow.dag import DbtDag diff --git a/cosmos/config.py b/cosmos/config.py index 6ae2a527a8..de07e59250 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -166,8 +166,7 @@ class ProjectConfig: :param copy_dbt_packages: Copy dbt_packages directory, if it exists, instead of creating a symbolic link. If not set, fetches the value from [cosmos]default_copy_dbt_packages (False by default). :param models_relative_path: The relative path to the dbt models directory within the project. Defaults to models :param seeds_relative_path: The relative path to the dbt seeds directory within the project. Defaults to seeds - :param snapshots_relative_path: The relative path to the dbt snapshots directory within the project. Defaults to - snapshots + :param snapshots_relative_path: The relative path to the dbt snapshots directory within the project. Defaults to snapshots :param manifest_path: The absolute path to the dbt manifest file. Defaults to None :param manifest_conn_id: Name of the Airflow connection used to access the manifest file if it is not stored locally. Defaults to None :param project_name: Allows the user to define the project name. diff --git a/cosmos/converter.py b/cosmos/converter.py index 854d4f2710..1a204c199b 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -23,7 +23,8 @@ from cosmos import cache, settings from cosmos.airflow.graph import build_airflow_graph from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import ExecutionMode, LoadMode +from cosmos.constants import ExecutionMode, InvocationMode, LoadMode +from cosmos.dbt.executable import get_system_dbt, is_dbt_installed_in_same_environment from cosmos.dbt.graph import DbtGraph from cosmos.dbt.project import has_non_empty_dependencies_file from cosmos.dbt.selector import retrieve_by_label @@ -174,6 +175,16 @@ def validate_initial_user_config( "please use ProjectConfig.env_vars instead." ) + if render_config is not None and render_config.invocation_mode == InvocationMode.DBT_RUNNER: + if not is_dbt_installed_in_same_environment(): + raise CosmosValueError( + "RenderConfig.invocation_mode is set to InvocationMode.DBT_RUNNER, but dbt is not installed in the same environment as Airflow. Use InvocationMode.DBT_SUBPROCESS instead." + ) + if render_config.dbt_executable_path and render_config.dbt_executable_path != get_system_dbt(): + raise CosmosValueError( + "RenderConfig.dbt_executable_path is set, but it is not the same as the system dbt executable path. Do not set render_config.dbt_executable_path when using InvocationMode.DBT_RUNNER." + ) + def validate_changed_config_paths( execution_config: ExecutionConfig | None, project_config: ProjectConfig, render_config: RenderConfig | None diff --git a/cosmos/dbt/executable.py b/cosmos/dbt/executable.py index d926e1c832..6a75bc7173 100644 --- a/cosmos/dbt/executable.py +++ b/cosmos/dbt/executable.py @@ -1,4 +1,5 @@ import shutil +from importlib.util import find_spec def get_system_dbt() -> str: @@ -6,3 +7,15 @@ def get_system_dbt() -> str: Tries to identify which is the path to the dbt executable, return "dbt" otherwise. """ return shutil.which("dbt") or "dbt" + + +def is_dbt_installed_in_same_environment() -> bool: + """ + Checks if dbt is installed in the same environment as the current one. + """ + try: + find_spec("dbt") + except ImportError: + return False + else: + return True diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 3318e25540..282e8706b5 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -4,7 +4,7 @@ import json import logging import zlib -from collections.abc import Callable +from collections.abc import Sequence from datetime import timedelta from pathlib import Path from typing import TYPE_CHECKING, Any @@ -109,7 +109,9 @@ class DbtProducerWatcherOperator(DbtBuildMixin, DbtLocalBaseOperator): """ template_fields = DbtLocalBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] - _process_log_line_callable: Callable[[str, dict[str, Any]], None] | None = _store_dbt_resource_status_from_log + # Use staticmethod to prevent Python's descriptor protocol from binding the function to `self` + # when accessed via instance, which would incorrectly pass `self` as the first argument + _process_log_line_callable = staticmethod(_store_dbt_resource_status_from_log) def __init__(self, *args: Any, **kwargs: Any) -> None: task_id = kwargs.pop("task_id", "dbt_producer_watcher_operator") @@ -502,7 +504,7 @@ class DbtSourceWatcherOperator(DbtSourceLocalOperator): Executes a dbt source freshness command, synchronously, as ExecutionMode.LOCAL. """ - template_fields: tuple[str, ...] = DbtConsumerWatcherSensor.template_fields + template_fields: Sequence[str] = DbtSourceLocalOperator.template_fields # type: ignore[assignment] class DbtRunWatcherOperator(DbtConsumerWatcherSensor): diff --git a/docs/configuration/generating-docs.rst b/docs/configuration/generating-docs.rst index ad9b926438..a0c4eb91db 100644 --- a/docs/configuration/generating-docs.rst +++ b/docs/configuration/generating-docs.rst @@ -80,7 +80,7 @@ You can use the :class:`~cosmos.operators.DbtDocsGCSOperator` to generate and up from cosmos.operators import DbtDocsGCSOperator # then, in your DAG code: - generate_dbt_docs_aws = DbtDocsGCSOperator( + generate_dbt_docs_gcs = DbtDocsGCSOperator( task_id="generate_dbt_docs_gcs", project_dir="path/to/jaffle_shop", profile_config=profile_config, @@ -113,7 +113,7 @@ The following code snippet shows how to provide this flag with the default jaffl from cosmos.operators import DbtDocsGCSOperator # then, in your DAG code: - generate_dbt_docs_aws = DbtDocsGCSOperator( + generate_dbt_docs_gcs = DbtDocsGCSOperator( task_id="generate_dbt_docs_gcs", project_dir="path/to/jaffle_shop", profile_config=profile_config, diff --git a/docs/configuration/project-config.rst b/docs/configuration/project-config.rst index 7eb771f1f3..4435f07873 100644 --- a/docs/configuration/project-config.rst +++ b/docs/configuration/project-config.rst @@ -5,12 +5,9 @@ The ``cosmos.config.ProjectConfig`` allows you to specify information about wher variables that should be used for rendering and execution. It takes the following arguments: - ``dbt_project_path``: The full path to your dbt project. This directory should have a ``dbt_project.yml`` file -- ``models_relative_path``: The path to your models directory, relative to the ``dbt_project_path``. This defaults to - ``models/`` -- ``seeds_relative_path``: The path to your seeds directory, relative to the ``dbt_project_path``. This defaults to - ``data/`` -- ``snapshots_relative_path``: The path to your snapshots directory, relative to the ``dbt_project_path``. This defaults - to ``snapshots/`` +- ``models_relative_path``: The path to your models directory, relative to the ``dbt_project_path``. This defaults to ``models`` +- ``seeds_relative_path``: The path to your seeds directory, relative to the ``dbt_project_path``. This defaults to ``seeds`` +- ``snapshots_relative_path``: The path to your snapshots directory, relative to the ``dbt_project_path``. This defaults to ``snapshots`` - ``manifest_path``: The absolute path to your manifests directory. This is only required if you're using Cosmos' manifest parsing mode. Along with supporting local paths for manifest parsing, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, Cosmos also supports remote paths for manifest parsing(e.g. S3 URL). See :ref:`parsing-methods` for more details. @@ -39,9 +36,9 @@ Project Config Example config = ProjectConfig( dbt_project_path="/path/to/dbt/project", - models_relative_path="models", - seeds_relative_path="data", - snapshots_relative_path="snapshots", + models_relative_path="custom_models_folder", + seeds_relative_path="custom_seeds_folder", + snapshots_relative_path="custom_snapshots_folder", manifest_path="/path/to/manifests", env_vars={"MY_ENV_VAR": "my_env_value"}, dbt_vars={ diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 95687ac4db..5ac27e03cb 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -7,7 +7,7 @@ It does this by exposing a ``cosmos.config.RenderConfig`` class that you can use The ``RenderConfig`` class takes the following arguments: -- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV`` and ``ExecutionMode.WATCHER``. +- ``emit_datasets``: whether or not to emit Airflow datasets to be used for data-aware scheduling. Defaults to True. This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV``, ``ExecutionMode.WATCHER`` and ``ExecutionMode.AIRFLOW_ASYNC``. - ``test_behavior``: how to run tests. Defaults to running a model's tests immediately after the model is run. For more information, see the `Testing Behavior `_ section. - ``load_method``: how to load your dbt project. See `Parsing Methods `_ for more information. - ``invocation_mode``: (new in v1.9) how to run ``dbt ls``, when using ``LoadMode.DBT_LS``. Learn more about this below. diff --git a/docs/configuration/scheduling.rst b/docs/configuration/scheduling.rst index b6e812a136..d9e860ec2a 100644 --- a/docs/configuration/scheduling.rst +++ b/docs/configuration/scheduling.rst @@ -31,7 +31,7 @@ By default, if using a version between Airflow 2.4 or higher is used, Cosmos emi .. important:: - This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV`` and ``ExecutionMode.WATCHER``. + This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV``, ``ExecutionMode.WATCHER`` and ``ExecutionMode.AIRFLOW_ASYNC``. Cosmos calculates these URIs during the task execution, by using the library `OpenLineage Integration Common `_. diff --git a/docs/getting_started/async-execution-mode.rst b/docs/getting_started/async-execution-mode.rst index aa6e96b0b3..b502abd865 100644 --- a/docs/getting_started/async-execution-mode.rst +++ b/docs/getting_started/async-execution-mode.rst @@ -248,7 +248,4 @@ Limitations 9. **TeardownAsyncOperator limitation**: When using a remote object location, in addition to the ``SetupAsyncOperator``, a ``TeardownAsyncOperator`` is also added to the DAG. This task will delete the SQL files from the remote location by the end of the DAG Run. This is can lead to a limitation from a retry perspective, as described in the issue `#2066 `_. This can be avoided by setting the ``enable_teardown_async_task`` configuration to ``False``, as described in the :ref:`enable_teardown_async_task` section. -10. **Dataset events not emitted**: Dataset events are not currently emitted after dbt models complete when using ``ExecutionMode.AIRFLOW_ASYNC``. This means downstream DAGs scheduled with ``Dataset`` or ``DatasetAlias`` will not trigger automatically. This behaviour is present in ``ExecutionMode.LOCAL`` but is currently missing in async mode. This issue is being tracked in `#2141 `_. - - For a comparison between different Cosmos execution modes, please, check the :ref:`execution-modes-comparison` section. diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 36bf121ac7..3bf9d1680f 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -44,7 +44,7 @@ uv pip install "gcsfs<2025.3.0" if [ "$AIRFLOW_VERSION" = "2.6" ] ; then uv pip install "apache-airflow-providers-amazon" "apache-airflow==$AIRFLOW_VERSION" "urllib3<2" uv pip install "apache-airflow-providers-cncf-kubernetes" "apache-airflow==$AIRFLOW_VERSION" - uv pip install "apache-airflow-providers-google<10.11" "apache-airflow==$AIRFLOW_VERSION" + uv pip install "apache-airflow-providers-google<10.11" "httplib2==0.31.0" "apache-airflow==$AIRFLOW_VERSION" uv pip install "apache-airflow-providers-microsoft-azure" "apache-airflow==$AIRFLOW_VERSION" uv pip install "pydantic<2.0" elif [ "$AIRFLOW_VERSION" = "2.7" ] ; then diff --git a/tests/listeners/test_task_instance_listener.py b/tests/listeners/test_task_instance_listener.py index 2afce2809c..1ac1c5a395 100644 --- a/tests/listeners/test_task_instance_listener.py +++ b/tests/listeners/test_task_instance_listener.py @@ -2,17 +2,29 @@ from pathlib import Path from types import SimpleNamespace +from typing import Any from unittest.mock import patch import pytest from airflow.models.connection import Connection +try: # Airflow 3 + from airflow.sdk import Context +except ImportError: # Airflow 2 + from airflow.utils.context import Context + + from cosmos import ProfileConfig from cosmos.constants import InvocationMode from cosmos.listeners import task_instance_listener from cosmos.operators.base import AbstractDbtBase from cosmos.profiles import get_automatic_profile_mapping +try: + from airflow.sdk.bases.operator import BaseOperator # Airflow 3 +except ImportError: + from airflow.models import BaseOperator # Airflow 2 + DBT_PROJECT_PROFILE = Path(__file__).parent.parent / "sample/mini/profiles.yml" @@ -35,6 +47,11 @@ def mock_postgres_conn(): # type: ignore yield conn +class DummyOperator(BaseOperator): + def execute(self, context: Context) -> Any: + pass + + class DummyDbtOperator(AbstractDbtBase): base_cmd = ["run"] @@ -155,6 +172,18 @@ def test_profile_file_metrics(): assert metrics["database"] == "postgres" +def test_profile_metrics_with_non_cosmos_operator(): + operator = DummyOperator(task_id="test") + ti = _make_task_instance(operator) + metrics = task_instance_listener._build_task_metrics(ti, status="success") + + assert metrics["operator_name"] == "DummyOperator" + assert metrics["is_cosmos_operator_subclass"] is False + assert metrics["profile_strategy"] is None + assert metrics["profile_mapping_class"] is None + assert metrics["database"] is None + + def test_build_task_metrics_records_core_fields(): operator = DummyDbtOperator() ti = _make_task_instance(operator) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index cc506495c7..d8d34886ed 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -27,6 +27,7 @@ DbtRunWatcherOperator, DbtSeedWatcherOperator, DbtTestWatcherOperator, + _store_dbt_resource_status_from_log, ) from cosmos.profiles import PostgresUserPasswordProfileMapping, get_automatic_profile_mapping from tests.utils import AIRFLOW_VERSION, new_test_dag @@ -425,6 +426,134 @@ def fake_build_run(self, context, **kw): assert data["results"][0]["status"] == "success" +class TestStoreDbStatusFromLog: + """Tests for _store_dbt_resource_status_from_log and _process_log_line_callable.""" + + def test_store_dbt_resource_status_from_log_success(self): + """Test that success status is correctly parsed and stored in XCom.""" + ti = _MockTI() + ctx = {"ti": ti} + + log_line = json.dumps({"data": {"node_info": {"node_status": "success", "unique_id": "model.pkg.my_model"}}}) + + _store_dbt_resource_status_from_log(log_line, {"context": ctx}) + + assert ti.store.get("model__pkg__my_model_status") == "success" + + def test_store_dbt_resource_status_from_log_failed(self): + """Test that failed status is correctly parsed and stored in XCom.""" + ti = _MockTI() + ctx = {"ti": ti} + + log_line = json.dumps({"data": {"node_info": {"node_status": "failed", "unique_id": "model.pkg.failed_model"}}}) + + _store_dbt_resource_status_from_log(log_line, {"context": ctx}) + + assert ti.store.get("model__pkg__failed_model_status") == "failed" + + def test_store_dbt_resource_status_from_log_ignores_other_statuses(self): + """Test that statuses other than success/failed are ignored.""" + ti = _MockTI() + ctx = {"ti": ti} + + log_line = json.dumps( + {"data": {"node_info": {"node_status": "running", "unique_id": "model.pkg.running_model"}}} + ) + + _store_dbt_resource_status_from_log(log_line, {"context": ctx}) + + assert "model__pkg__running_model_status" not in ti.store + + def test_store_dbt_resource_status_from_log_handles_invalid_json(self, caplog): + """Test that invalid JSON doesn't raise an exception.""" + ti = _MockTI() + ctx = {"ti": ti} + + # Should not raise an exception + _store_dbt_resource_status_from_log("not valid json {{{", {"context": ctx}) + + # No status should be stored + assert len(ti.store) == 0 + + def test_store_dbt_resource_status_from_log_handles_missing_node_info(self): + """Test that missing node_info doesn't raise an exception.""" + ti = _MockTI() + ctx = {"ti": ti} + + log_line = json.dumps({"data": {"other_key": "value"}}) + + # Should not raise an exception + _store_dbt_resource_status_from_log(log_line, {"context": ctx}) + + # No status should be stored + assert len(ti.store) == 0 + + def test_process_log_line_callable_is_not_bound_method(self): + """Test that _process_log_line_callable is not bound as a method when accessed through an instance. + + This test verifies the fix for the bug where accessing _process_log_line_callable through + an instance would create a bound method, causing 'self' to be passed as the first argument. + """ + import inspect + + op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) + + # Access the callable through the instance + callable_from_instance = op._process_log_line_callable + + # Verify it's not a bound method (which would have __self__ attribute) + assert not inspect.ismethod( + callable_from_instance + ), "_process_log_line_callable should not be a bound method when accessed through instance" + + # Verify it's the original function + assert callable_from_instance is _store_dbt_resource_status_from_log + + def test_process_log_line_callable_accepts_two_arguments(self): + """Test that the callable can be called with exactly 2 arguments (line, kwargs). + + This tests the integration pattern used in subprocess.py where process_log_line(line, kwargs) is called. + """ + op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) + callable_from_instance = op._process_log_line_callable + + ti = _MockTI() + ctx = {"ti": ti} + + log_line = json.dumps({"data": {"node_info": {"node_status": "success", "unique_id": "model.pkg.test_model"}}}) + + # This should NOT raise TypeError about wrong number of arguments + callable_from_instance(log_line, {"context": ctx}) + + assert ti.store.get("model__pkg__test_model_status") == "success" + + def test_process_log_line_callable_integration_with_subprocess_pattern(self): + """Test the exact pattern used in subprocess.py: process_log_line(line, kwargs).""" + op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) + + ti = _MockTI() + ctx = {"ti": ti} + + # Simulate the kwargs dict that subprocess.py passes + kwargs = {"context": ctx, "other_param": "value"} + + log_lines = [ + json.dumps({"data": {"node_info": {"node_status": "success", "unique_id": "model.pkg.model_a"}}}), + json.dumps({"data": {"node_info": {"node_status": "failed", "unique_id": "model.pkg.model_b"}}}), + json.dumps({"info": {"msg": "Running with dbt=1.10.11"}}), # Non-node log line + ] + + # Simulate the subprocess.py pattern + process_log_line = op._process_log_line_callable + for line in log_lines: + if process_log_line: + process_log_line(line, kwargs) + + assert ti.store.get("model__pkg__model_a_status") == "success" + assert ti.store.get("model__pkg__model_b_status") == "failed" + assert len(ti.store) == 2 # Only success and failed statuses are stored + + @patch("cosmos.dbt.runner.is_available", return_value=False) @patch("cosmos.operators.watcher.DbtLocalBaseOperator.execute", return_value="done") def test_execute_discovers_invocation_mode(_mock_execute, _mock_is_available): @@ -1158,3 +1287,16 @@ def test_sensor_and_producer_different_param_values(mock_bigquery_conn): assert task.execution_timeout == timedelta(seconds=2) else: assert task.execution_timeout == timedelta(seconds=1) + + +def test_dbt_source_watcher_operator_template_fields(): + """Test that DbtSourceWatcherOperator doesn't include model_unique_id in template_fields.""" + from cosmos.operators.local import DbtSourceLocalOperator + from cosmos.operators.watcher import DbtSourceWatcherOperator + + # DbtSourceWatcherOperator should NOT have model_unique_id in template_fields + # because it runs locally and doesn't watch models, it executes source freshness + assert "model_unique_id" not in DbtSourceWatcherOperator.template_fields + + # DbtSourceWatcherOperator should inherit template_fields from DbtSourceLocalOperator + assert DbtSourceWatcherOperator.template_fields == DbtSourceLocalOperator.template_fields diff --git a/tests/test_converter.py b/tests/test_converter.py index 8cf22834ae..12050c8e74 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -143,6 +143,68 @@ def test_validate_user_config_fails_project_config_render_config_env_vars(): validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) +@patch("cosmos.converter.is_dbt_installed_in_same_environment", return_value=False) +def test_validate_initial_user_config_dbt_runner_without_dbt_installed(mock_is_dbt_installed): + """Test that validation fails when using DBT_RUNNER but dbt is not installed in the same environment.""" + project_config = ProjectConfig() + execution_config = ExecutionConfig() + render_config = RenderConfig(invocation_mode=InvocationMode.DBT_RUNNER) + profile_config = MagicMock() + operator_args = {} + + expected_error_match = "RenderConfig.invocation_mode is set to InvocationMode.DBT_RUNNER, but dbt is not installed in the same environment as Airflow.*" + with pytest.raises(CosmosValueError, match=expected_error_match): + validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) + + +@patch("cosmos.converter.get_system_dbt", return_value="/usr/local/bin/dbt") +@patch("cosmos.converter.is_dbt_installed_in_same_environment", return_value=True) +def test_validate_initial_user_config_dbt_runner_with_different_dbt_executable_path( + mock_is_dbt_installed, mock_get_system_dbt +): + """Test that validation fails when using DBT_RUNNER with a custom dbt_executable_path that differs from system dbt.""" + project_config = ProjectConfig() + execution_config = ExecutionConfig() + render_config = RenderConfig(invocation_mode=InvocationMode.DBT_RUNNER, dbt_executable_path="/custom/path/to/dbt") + profile_config = MagicMock() + operator_args = {} + + expected_error_match = ( + "RenderConfig.dbt_executable_path is set, but it is not the same as the system dbt executable path.*" + ) + with pytest.raises(CosmosValueError, match=expected_error_match): + validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) + + +@patch("cosmos.converter.get_system_dbt", return_value="/usr/local/bin/dbt") +@patch("cosmos.converter.is_dbt_installed_in_same_environment", return_value=True) +def test_validate_initial_user_config_dbt_runner_with_matching_dbt_executable_path( + mock_is_dbt_installed, mock_get_system_dbt +): + """Test that validation passes when using DBT_RUNNER with a dbt_executable_path matching system dbt.""" + project_config = ProjectConfig() + execution_config = ExecutionConfig() + render_config = RenderConfig(invocation_mode=InvocationMode.DBT_RUNNER, dbt_executable_path="/usr/local/bin/dbt") + profile_config = MagicMock() + operator_args = {} + + # Should not raise any exception + validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) + + +@patch("cosmos.converter.is_dbt_installed_in_same_environment", return_value=True) +def test_validate_initial_user_config_dbt_runner_without_dbt_executable_path(mock_is_dbt_installed): + """Test that validation passes when using DBT_RUNNER without setting dbt_executable_path.""" + project_config = ProjectConfig() + execution_config = ExecutionConfig() + render_config = RenderConfig(invocation_mode=InvocationMode.DBT_RUNNER) + profile_config = MagicMock() + operator_args = {} + + # Should not raise any exception + validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) + + def test_validate_arguments_schema_in_task_args(): execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL, dbt_project_path="/tmp/project-dir") render_config = RenderConfig()