Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,14 @@ def __init__(
# To keep this logic working, if converter is given no ProfileConfig,
# we can create a default retaining this value to preserve this functionality.
# We may want to consider defaulting this value in our actual ProjceConfig class?
dbt_graph = DbtGraph(
self.dbt_graph = DbtGraph(
project=project_config,
render_config=render_config,
execution_config=execution_config,
profile_config=profile_config,
dbt_vars=dbt_vars,
)
dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)
self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)

task_args = {
**operator_args,
Expand All @@ -266,7 +266,7 @@ def __init__(
)

build_airflow_graph(
nodes=dbt_graph.filtered_nodes,
nodes=self.dbt_graph.filtered_nodes,
dag=dag or (task_group and task_group.dag),
task_group=task_group,
execution_mode=execution_config.execution_mode,
Expand Down
33 changes: 32 additions & 1 deletion tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import DbtResourceType, ExecutionMode, InvocationMode, LoadMode
from cosmos.converter import DbtToAirflowConverter, validate_arguments, validate_initial_user_config
from cosmos.dbt.graph import DbtNode
from cosmos.dbt.graph import DbtGraph, DbtNode
from cosmos.exceptions import CosmosValueError
from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping

Expand Down Expand Up @@ -468,3 +468,34 @@ def test_converter_invocation_mode_added_to_task_args(
assert kwargs["task_args"]["invocation_mode"] == invocation_mode
else:
assert "invocation_mode" not in kwargs["task_args"]


@pytest.mark.parametrize(
"execution_mode,operator_args",
[
(ExecutionMode.KUBERNETES, {}),
],
)
@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes)
@patch("cosmos.converter.DbtGraph.load")
def test_converter_contains_dbt_graph(mock_load_dbt_graph, execution_mode, operator_args):
"""
This test validates that DbtToAirflowConverter contains and exposes a DbtGraph instance
"""
project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT)
execution_config = ExecutionConfig(execution_mode=execution_mode)
render_config = RenderConfig(emit_datasets=True)
profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
)
converter = DbtToAirflowConverter(
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
operator_args=operator_args,
)
assert isinstance(converter.dbt_graph, DbtGraph)