Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def override_configuration(
if execution_config.invocation_mode:
operator_args["invocation_mode"] = execution_config.invocation_mode

if execution_config in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV):
if execution_config.execution_mode in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV):
if "install_deps" not in operator_args:
operator_args["install_deps"] = project_config.install_dbt_deps

Expand Down
57 changes: 55 additions & 2 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ def test_validate_initial_user_config_expects_profile(execution_mode):
assert validate_initial_user_config(execution_config, profile_config, project_config, None, {}) is None


@pytest.mark.parametrize("operator_args", [{"env": {"key": "value"}}, {"vars": {"key": "value"}}])
@pytest.mark.parametrize(
"operator_args", [{"env": {"key": "value"}}, {"vars": {"key": "value"}}, {"install_deps": {"key": "value"}}]
)
def test_validate_user_config_operator_args_deprecated(operator_args):
"""Deprecating warnings should be raised when using operator_args with "vars" or "env"."""
"""Deprecating warnings should be raised when using operator_args with "vars", "env" or "install_deps"."""
project_config = ProjectConfig()
execution_config = ExecutionConfig()
render_config = RenderConfig()
Expand Down Expand Up @@ -709,6 +711,57 @@ def test_validate_converter_fetches_project_name_from_render_config(
assert mock_build_airflow_graph.call_args.kwargs["dbt_project_name"] == "project1"


@pytest.mark.parametrize(
"execution_mode,operator_args,install_dbt_deps,expected",
[
(ExecutionMode.KUBERNETES, {}, False, None),
(ExecutionMode.LOCAL, {}, False, False),
(ExecutionMode.VIRTUALENV, {}, False, False),
(ExecutionMode.LOCAL, {}, True, True),
(ExecutionMode.VIRTUALENV, {}, True, True),
(ExecutionMode.KUBERNETES, {"install_deps": True}, False, True),
(ExecutionMode.LOCAL, {"install_deps": True}, False, True),
(ExecutionMode.VIRTUALENV, {"install_deps": True}, False, True),
],
)
@patch("cosmos.config.ProjectConfig.validate_project")
@patch("cosmos.converter.validate_initial_user_config")
@patch("cosmos.converter.DbtGraph")
@patch("cosmos.converter.build_airflow_graph")
def test_project_config_install_dbt_deps_overrides_operator_args(
mock_build_airflow_graph,
mock_user_config,
mock_dbt_graph,
mock_validate_project,
execution_mode,
operator_args,
install_dbt_deps,
expected,
):
"""Tests that the value project_config.install_dbt_deps is used to define operator_args["install_deps"] if
execution mode is ExecutionMode.LOCAL or ExecutionMode.VIRTUALENV and operator_args["install_deps"] is not
already defined.
"""
project_config = ProjectConfig(project_name="fake-project", dbt_project_path="/some/project/path")
project_config.install_dbt_deps = install_dbt_deps
execution_config = ExecutionConfig(execution_mode=execution_mode)
render_config = MagicMock()
profile_config = MagicMock()
with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag:
DbtToAirflowConverter(
dag=dag,
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
operator_args=operator_args,
)
_, kwargs = mock_build_airflow_graph.call_args

assert kwargs["task_args"].get("install_deps", None) == expected


@pytest.mark.parametrize("invocation_mode", [None, InvocationMode.SUBPROCESS, InvocationMode.DBT_RUNNER])
@patch("cosmos.config.ProjectConfig.validate_project")
@patch("cosmos.converter.validate_initial_user_config")
Expand Down