Skip to content
Merged
4 changes: 4 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def create_task_metadata(
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE,
normalize_task_id: Callable[..., Any] | None = None,
test_behavior: TestBehavior = TestBehavior.AFTER_ALL,
test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER,
on_warning_callback: Callable[..., Any] | None = None,
detached_from_parent: dict[str, DbtNode] | None = None,
) -> TaskMetadata | None:
Expand Down Expand Up @@ -271,6 +272,8 @@ def create_task_metadata(
}

if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES:
if test_indirect_selection != TestIndirectSelection.EAGER:
args["indirect_selection"] = test_indirect_selection.value
args["on_warning_callback"] = on_warning_callback
exclude_detached_tests_if_needed(node, args, detached_from_parent)
task_id, args = _get_task_id_and_args(
Expand Down Expand Up @@ -367,6 +370,7 @@ def generate_task_or_group(
source_rendering_behavior=source_rendering_behavior,
normalize_task_id=normalize_task_id,
test_behavior=test_behavior,
test_indirect_selection=test_indirect_selection,
on_warning_callback=on_warning_callback,
detached_from_parent=detached_from_parent,
)
Expand Down
37 changes: 37 additions & 0 deletions tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,43 @@ def test_create_task_metadata_normalize_task_id(
assert "task_display_name" not in metadata.arguments


@pytest.mark.integration
def test_build_airflow_graph_with_build_and_buildable_indirect_selection():
with DAG("test-build-buildable", start_date=datetime(2022, 1, 1)) as dag:
task_args = {
"project_dir": SAMPLE_PROJ_PATH,
"conn_id": "fake_conn",
"profile_config": ProfileConfig(
profile_name="default",
target_name="default",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="fake_conn",
profile_args={"schema": "public"},
),
),
}
render_config = RenderConfig(
test_behavior=TestBehavior.BUILD,
)
build_airflow_graph(
nodes=sample_nodes,
dag=dag,
execution_mode=ExecutionMode.LOCAL,
test_indirect_selection=TestIndirectSelection.BUILDABLE,
task_args=task_args,
dbt_project_name="astro_shop",
render_config=render_config,
)

topological_sort = [task.task_id for task in dag.topological_sort()]
expected_sort = ["seed_parent_seed_build", "parent_model_build", "child_model_build", "child2_v2_model_build"]
assert topological_sort == expected_sort

for task in dag.tasks:
if hasattr(task, "indirect_selection"):
assert task.indirect_selection == TestIndirectSelection.BUILDABLE.value


@pytest.mark.parametrize(
"node_type,node_unique_id,test_indirect_selection,additional_arguments",
[
Expand Down