Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def create_dbt_resource_to_class(test_behavior: TestBehavior) -> dict[str, str]:
return dbt_resource_to_class


def create_task_metadata(
def create_task_metadata( # noqa: C901
node: DbtNode,
execution_mode: ExecutionMode,
args: dict[str, Any],
Expand Down Expand Up @@ -342,7 +342,10 @@ def create_task_metadata(
models_select_key = "models" if settings.pre_dbt_fusion else "select"

if render_config.test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES:
args[models_select_key] = f"{node.resource_name}"
if node.fqn and len(node.fqn) > 0:
args[models_select_key] = f"fqn:{'.'.join(node.fqn)}"
else:
args[models_select_key] = f"{node.resource_name}"
if test_indirect_selection != TestIndirectSelection.EAGER:
args["indirect_selection"] = test_indirect_selection.value
args["on_warning_callback"] = on_warning_callback
Expand Down Expand Up @@ -392,7 +395,10 @@ def create_task_metadata(
args = {}
return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator", arguments=args)
else: # DbtResourceType.MODEL, DbtResourceType.SEED and DbtResourceType.SNAPSHOT
args[models_select_key] = node.resource_name
if node.fqn and len(node.fqn) > 0:
args[models_select_key] = f"fqn:{'.'.join(node.fqn)}"
else:
args[models_select_key] = node.resource_name
task_id, args = _get_task_id_and_args(
node=node,
args=args,
Expand Down
4 changes: 4 additions & 0 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class DbtNode:
has_test: bool = False
has_non_detached_test: bool = False
downstream: list[str] = field(default_factory=lambda: [])
fqn: list[str] | None = None

@property
def meta(self) -> dict[str, Any]:
Expand Down Expand Up @@ -342,6 +343,7 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE
else False
),
fqn=node_dict.get("fqn"),
)
except (KeyError, TypeError):
logger.info("Could not parse following the dbt ls line even though it was a valid JSON `%s`", line)
Expand Down Expand Up @@ -645,6 +647,7 @@ def run_dbt_ls(
"tags",
"config",
"freshness",
"fqn",
]
Comment thread
pankajastro marked this conversation as resolved.
else:
ls_command = [
Expand Down Expand Up @@ -1161,6 +1164,7 @@ def load_from_dbt_manifest(self) -> None:
if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE
else False
),
fqn=node_dict.get("fqn"),
)

nodes[node.unique_id] = node
Expand Down
6 changes: 3 additions & 3 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2683,9 +2683,9 @@ def test__normalize_path():
"pre_dbt_fusion_value,source_rendering_behaviour_value,expected_args_count",
[
(True, SourceRenderingBehavior.NONE, 4),
(False, SourceRenderingBehavior.NONE, 13),
(True, SourceRenderingBehavior.ALL, 13),
(False, SourceRenderingBehavior.ALL, 13),
(False, SourceRenderingBehavior.NONE, 14),
(True, SourceRenderingBehavior.ALL, 14),
(False, SourceRenderingBehavior.ALL, 14),
],
)
@patch("cosmos.dbt.graph.settings")
Expand Down
2 changes: 1 addition & 1 deletion tests/operators/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,7 @@ def test_dbt_task_group_with_watcher_has_correct_templated_dbt_cmd():
assert full_cmd[1] == "run" # dbt run command

cmd = " ".join(full_cmd)
assert "--select stg_customers" in cmd
assert "--select fqn:jaffle_shop.staging.stg_customers" in cmd
assert "--threads 1" in cmd


Expand Down
14 changes: 10 additions & 4 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,17 +527,23 @@ def test_converter_creates_dag_with_test_with_multiple_parents_and_build():
assert args[1:] == [
"build",
"--select",
"combined_model",
"fqn:my_dbt_project.combined_model",
"--exclude",
"custom_test_combined_model_combined_model_",
]

args = tasks["model.my_dbt_project.model_a"].build_cmd({})[0]
assert args[1:] == ["build", "--select", "model_a", "--exclude", "custom_test_combined_model_combined_model_"]
assert args[1:] == [
"build",
"--select",
"fqn:my_dbt_project.model_a",
"--exclude",
"custom_test_combined_model_combined_model_",
]

# The test for model_b should not be changed, since it is not a parent of this test
# The command for model_b should not add any exclusions, since it is not a parent of this test
args = tasks["model.my_dbt_project.model_b"].build_cmd({})[0]
assert args[1:] == ["build", "--select", "model_b"]
assert args[1:] == ["build", "--select", "fqn:my_dbt_project.model_b"]

# We should have a task dedicated to run the test with multiple parents
args = tasks["test.my_dbt_project.custom_test_combined_model_combined_model_.c6e4587380"].build_cmd({})[0]
Expand Down