Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,23 @@ def generate_task_or_group(
filtered_nodes=filtered_nodes,
)

# create_task_metadata() returns None for node types not in dbt_resource_to_class (e.g.dynamically-resolved types
# like "exposure", "analysis", or user-defined types). When the user has supplied a node_converter for that type,
# create a stub TaskMetadata so that generate_or_convert_task() can pass the node through to the converter.
if task_meta is None and node.resource_type in node_converters:
task_id, _ = _get_task_id_and_args(
node=node,
args={},
use_task_group=False,
normalize_task_id=render_config.normalize_task_id,
normalize_task_display_name=render_config.normalize_task_display_name,
resource_suffix=node.resource_type.value,
execution_mode=execution_mode,
)

# Stub for task_meta, "" operator class
task_meta = TaskMetadata(id=task_id, operator_class="", arguments={})

generate_or_convert_task_args = {
"task_meta": task_meta,
"dbt_project_name": dbt_project_name,
Expand Down
2 changes: 1 addition & 1 deletion dev/dags/example_cosmos_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def convert_exposure(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs):
)

project_config = ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
DBT_ROOT_PATH / "altered_jaffle_shop",
dbt_vars={"animation_alias": "top_5_animated_movies"},
)

Expand Down
84 changes: 84 additions & 0 deletions tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,90 @@ def test_create_task_group_for_after_each_supported_nodes(node_type: DbtResource
assert list(output.children.keys()) == [f"dbt_node.{task_suffix}", "dbt_node.test"]


def test_generate_task_or_group_with_dynamic_node_type_and_converter():
"""A node whose resource_type is not in dbt_resource_to_class should be rendered via a user-supplied converter."""

def _convert_dynamic(dag: DAG, task_group: TaskGroup, node: DbtNode, task_id: str, **kwargs):
return EmptyOperator(dag=dag, task_group=task_group, task_id=task_id)

with DAG("test-dynamic-converter", start_date=datetime(2022, 1, 1)) as dag:
node = DbtNode(
unique_id="exposure.dbt-proj.my_exposure",
resource_type=DbtResourceType("exposure"),
depends_on=[],
path_base=Path("."),
original_file_path=Path("."),
)

output = generate_task_or_group(
dag=dag,
task_group=None,
node=node,
execution_mode=ExecutionMode.LOCAL,
test_indirect_selection=TestIndirectSelection.EAGER,
task_args={
"project_dir": SAMPLE_PROJ_PATH,
"profile_config": ProfileConfig(
profile_name="default",
target_name="default",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="fake_conn",
profile_args={"schema": "public"},
),
),
},
dbt_project_name="astro_shop",
node_converters={DbtResourceType("exposure"): _convert_dynamic},
render_config=RenderConfig(
test_behavior=TestBehavior.NONE,
source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR,
),
on_warning_callback=None,
)
assert isinstance(output, EmptyOperator)


def test_generate_task_or_group_with_dynamic_node_type_no_converter_returns_none():
"""A node whose resource_type is not in dbt_resource_to_class and has no converter should be skipped."""
with DAG("test-dynamic-no-converter", start_date=datetime(2022, 1, 1)) as dag:
node = DbtNode(
unique_id="exposure.dbt-proj.my_exposure",
resource_type=DbtResourceType("exposure"),
depends_on=[],
path_base=Path("."),
original_file_path=Path("."),
)

output = generate_task_or_group(
dag=dag,
task_group=None,
node=node,
execution_mode=ExecutionMode.LOCAL,
test_indirect_selection=TestIndirectSelection.EAGER,
task_args={
"project_dir": SAMPLE_PROJ_PATH,
"profile_config": ProfileConfig(
profile_name="default",
target_name="default",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="fake_conn",
profile_args={"schema": "public"},
),
),
},
dbt_project_name="astro_shop",
node_converters={},
render_config=RenderConfig(
test_behavior=TestBehavior.NONE,
source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR,
),
on_warning_callback=None,
)

# No converter is supplied
assert output is None


@pytest.mark.integration
def test_build_airflow_graph_with_after_all():
with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag:
Expand Down
Loading