From cefaa21e16fdc5c910bd1f2f680084ddf2cfb36b Mon Sep 17 00:00:00 2001 From: Anya Conti Date: Thu, 8 May 2025 18:02:35 -0400 Subject: [PATCH 01/23] Modifies the node_converter logic to be at the task level instead of task group level --- cosmos/airflow/graph.py | 181 +++++++++++++++++++++++++++++++----- tests/airflow/test_graph.py | 148 +++++++++++++++++++++++++++++ 2 files changed, 307 insertions(+), 22 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 20229f3fe7..c4ee51fd05 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -87,7 +87,7 @@ def calculate_leaves(tasks_ids: list[str], nodes: dict[str, DbtNode]) -> list[st def exclude_detached_tests_if_needed( node: DbtNode, task_args: dict[str, str], - detached_from_parent: dict[str, DbtNode] | None = None, + detached_from_parent: dict[str, list[DbtNode]] | None = None, ) -> None: """ Add exclude statements if there are tests associated to the model that should be run detached from the model/tests. @@ -128,7 +128,7 @@ def create_test_task_metadata( on_warning_callback: Callable[..., Any] | None = None, node: DbtNode | None = None, render_config: RenderConfig | None = None, - detached_from_parent: dict[str, DbtNode] | None = None, + detached_from_parent: dict[str, list[DbtNode]] | None = None, ) -> TaskMetadata: """ Create the metadata that will be used to instantiate the Airflow Task that will be used to run the Dbt test node. @@ -249,7 +249,7 @@ def create_task_metadata( test_behavior: TestBehavior = TestBehavior.AFTER_ALL, test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER, on_warning_callback: Callable[..., Any] | None = None, - detached_from_parent: dict[str, DbtNode] | None = None, + detached_from_parent: dict[str, list[DbtNode]] | None = None, ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -344,6 +344,63 @@ def is_detached_test(node: DbtNode) -> bool: return False +def generate_or_convert_task( + node_converters: dict[DbtResourceType, Callable[..., Any]], + task_meta: TaskMetadata, + dbt_project_name: str, + dag: DAG, + task_group: TaskGroup | None, + node: DbtNode, + resource_type: DbtResourceType, + execution_mode: ExecutionMode, + task_args: dict[str, Any], + test_behavior: TestBehavior, + source_rendering_behavior: SourceRenderingBehavior, + test_indirect_selection: TestIndirectSelection, + on_warning_callback: Callable[..., Any] | None, + normalize_task_id: Callable[..., Any] | None = None, + detached_from_parent: dict[str, list[DbtNode]] | None = None, + **kwargs: Any, +) -> BaseOperator: + """ + Checks if a node_converter was supplied for the given resource type: + - If yes, attempts to convert the task using the given node_converter + - If no, creates the task as expected with the supplied task_meta + Returns the created task + """ + + conversion_function = node_converters.get(resource_type, create_airflow_task) + if conversion_function != create_airflow_task: + task_id = task_meta.id + logger.warning( + "The `node_converters` attribute is an experimental feature. " + "Its syntax and behavior can be changed before a major release." + ) + logger.debug(f"Converting node <{node.unique_id}> task <{task_id}> using <{conversion_function.__name__}>") + task = conversion_function( # type: ignore + dag=dag, + task_group=task_group, + dbt_project_name=dbt_project_name, + execution_mode=execution_mode, + task_args=task_args, + test_behavior=test_behavior, + source_rendering_behavior=source_rendering_behavior, + test_indirect_selection=test_indirect_selection, + on_warning_callback=on_warning_callback, + normalize_task_id=normalize_task_id, + node=node, + task_id=task_id, + detached_from_parent=detached_from_parent, + ) + if task is not None: + logger.debug(f"Conversion of node <{node.unique_id}> task <{task_id}> was successful!") + else: + logger.warning(f"Conversion of node <{node.unique_id}> task <{task_id}> failed") + else: + task = create_airflow_task(task_meta, dag, task_group) + return task + + def generate_task_or_group( dag: DAG, task_group: TaskGroup | None, @@ -353,9 +410,11 @@ def generate_task_or_group( test_behavior: TestBehavior, source_rendering_behavior: SourceRenderingBehavior, test_indirect_selection: TestIndirectSelection, + dbt_project_name: str, + node_converters: dict[DbtResourceType, Callable[..., Any]], on_warning_callback: Callable[..., Any] | None, normalize_task_id: Callable[..., Any] | None = None, - detached_from_parent: dict[str, DbtNode] | None = None, + detached_from_parent: dict[str, list[DbtNode]] | None = None, **kwargs: Any, ) -> BaseOperator | TaskGroup | None: task_or_group: BaseOperator | TaskGroup | None = None @@ -387,7 +446,23 @@ def generate_task_or_group( if task_meta and not node.resource_type == DbtResourceType.TEST: if use_task_group: with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group: - task = create_airflow_task(task_meta, dag, task_group=model_task_group) + task = generate_or_convert_task( + node_converters=node_converters, + task_meta=task_meta, + dbt_project_name=dbt_project_name, + dag=dag, + task_group=model_task_group, + node=node, + resource_type=node.resource_type, + execution_mode=execution_mode, + task_args=task_args, + test_behavior=test_behavior, + source_rendering_behavior=source_rendering_behavior, + test_indirect_selection=test_indirect_selection, + on_warning_callback=on_warning_callback, + normalize_task_id=normalize_task_id, + detached_from_parent=detached_from_parent, + ) test_meta = create_test_task_metadata( "test", execution_mode, @@ -397,11 +472,48 @@ def generate_task_or_group( on_warning_callback=on_warning_callback, detached_from_parent=detached_from_parent, ) - test_task = create_airflow_task(test_meta, dag, task_group=model_task_group) - task >> test_task - task_or_group = model_task_group + test_task = generate_or_convert_task( + node_converters=node_converters, + task_meta=test_meta, + dbt_project_name=dbt_project_name, + dag=dag, + task_group=model_task_group, + node=node, + resource_type=DbtResourceType.TEST, # type: ignore + execution_mode=execution_mode, + task_args=task_args, + test_behavior=test_behavior, + source_rendering_behavior=source_rendering_behavior, + test_indirect_selection=test_indirect_selection, + on_warning_callback=on_warning_callback, + normalize_task_id=normalize_task_id, + detached_from_parent=detached_from_parent, + ) + if task is not None and test_task is not None: + task >> test_task + task_or_group = model_task_group + elif task is not None: + task_or_group = task + else: + task_or_group = test_task else: - task_or_group = create_airflow_task(task_meta, dag, task_group=task_group) + task_or_group = generate_or_convert_task( + node_converters=node_converters, + task_meta=task_meta, + dbt_project_name=dbt_project_name, + dag=dag, + task_group=task_group, + node=node, + resource_type=node.resource_type, + execution_mode=execution_mode, + task_args=task_args, + test_behavior=test_behavior, + source_rendering_behavior=source_rendering_behavior, + test_indirect_selection=test_indirect_selection, + on_warning_callback=on_warning_callback, + normalize_task_id=normalize_task_id, + detached_from_parent=detached_from_parent, + ) return task_or_group @@ -592,29 +704,22 @@ def build_airflow_graph( identify_detached_nodes(nodes, render_config, detached_nodes, detached_from_parent) for node_id, node in nodes.items(): - conversion_function = node_converters.get(node.resource_type, generate_task_or_group) - if conversion_function != generate_task_or_group: - logger.warning( - "The `node_converters` attribute is an experimental feature. " - "Its syntax and behavior can be changed before a major release." - ) - logger.debug(f"Converting <{node.unique_id}> using <{conversion_function.__name__}>") - task_or_group = conversion_function( # type: ignore + task_or_group = generate_task_or_group( # type: ignore dag=dag, task_group=task_group, - dbt_project_name=dbt_project_name, + node=node, execution_mode=execution_mode, task_args=task_args, test_behavior=test_behavior, source_rendering_behavior=source_rendering_behavior, test_indirect_selection=test_indirect_selection, + node_converters=node_converters, + dbt_project_name=dbt_project_name, on_warning_callback=on_warning_callback, normalize_task_id=normalize_task_id, - node=node, detached_from_parent=detached_from_parent, ) if task_or_group is not None: - logger.debug(f"Conversion of <{node.unique_id}> was successful!") tasks_map[node_id] = task_or_group # If test_behaviour=="after_all", there will be one test task, run by the end of the DAG @@ -628,7 +733,23 @@ def build_airflow_graph( on_warning_callback=on_warning_callback, render_config=render_config, ) - test_task = create_airflow_task(test_meta, dag, task_group=task_group) + test_task = generate_or_convert_task( + node_converters=node_converters, + task_meta=test_meta, + dbt_project_name=dbt_project_name, + dag=dag, + task_group=task_group, + node=node, + resource_type=DbtResourceType.TEST, # type: ignore + execution_mode=execution_mode, + task_args=task_args, + test_behavior=test_behavior, + source_rendering_behavior=source_rendering_behavior, + test_indirect_selection=test_indirect_selection, + on_warning_callback=on_warning_callback, + normalize_task_id=normalize_task_id, + detached_from_parent=detached_from_parent, + ) leaves_ids = calculate_leaves(tasks_ids=list(tasks_map.keys()), nodes=nodes) for leaf_node_id in leaves_ids: tasks_map[leaf_node_id] >> test_task @@ -645,7 +766,23 @@ def build_airflow_graph( render_config=render_config, node=node, ) - test_task = create_airflow_task(test_meta, dag, task_group=task_group) + test_task = generate_or_convert_task( + node_converters=node_converters, + task_meta=test_meta, + dbt_project_name=dbt_project_name, + dag=dag, + task_group=task_group, + node=node, + resource_type=node.resource_type, + execution_mode=execution_mode, + task_args=task_args, + test_behavior=test_behavior, + source_rendering_behavior=source_rendering_behavior, + test_indirect_selection=test_indirect_selection, + on_warning_callback=on_warning_callback, + normalize_task_id=normalize_task_id, + detached_from_parent=detached_from_parent, + ) tasks_map[node_id] = test_task create_airflow_task_dependencies(nodes, tasks_map) diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 1cc2418966..a6a6825408 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -7,6 +7,7 @@ from airflow import __version__ as airflow_version from airflow.models import DAG from airflow.models.abstractoperator import DEFAULT_OWNER +from airflow.operators.empty import EmptyOperator from airflow.utils.task_group import TaskGroup from packaging import version @@ -32,6 +33,12 @@ from cosmos.converter import airflow_kwargs from cosmos.dbt.graph import DbtNode from cosmos.exceptions import CosmosValueError +from cosmos.operators.local import ( + DbtBuildLocalOperator, + DbtRunLocalOperator, + DbtSeedLocalOperator, + DbtTestLocalOperator, +) from cosmos.profiles import PostgresUserPasswordProfileMapping SAMPLE_PROJ_PATH = Path("/home/user/path/dbt-proj/") @@ -212,6 +219,8 @@ def test_create_task_group_for_after_each_supported_nodes(node_type: DbtResource ), ), }, + dbt_project_name="astro_shop", + node_converters={}, test_behavior=TestBehavior.AFTER_EACH, on_warning_callback=None, source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, @@ -977,6 +986,8 @@ def test_owner(dbt_extra_config, expected_owner): ), ), }, + dbt_project_name="astro_shop", + node_converters={}, test_behavior=TestBehavior.AFTER_EACH, on_warning_callback=None, source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, @@ -1037,3 +1048,140 @@ def test_add_teardown_task_raises_error_without_async_py_requirements(): with pytest.raises(CosmosValueError, match="ExecutionConfig.AIRFLOW_ASYNC needs async_py_requirements to be set"): _add_teardown_task(sample_dag, ExecutionMode.AIRFLOW_ASYNC, task_args, sample_tasks_map, None, None) + + +def convert_task(dag: DAG, task_group: TaskGroup, node: DbtNode, task_id: str, **kwargs): + """ + Converts task to an empty operator. Helper function to test node_converter logic. + """ + return EmptyOperator(dag=dag, task_group=task_group, task_id=task_id) + + +@pytest.mark.integration +@pytest.mark.parametrize( + "test_behavior,node_converters,expected_task_types", + [ + ( + TestBehavior.AFTER_EACH, + {DbtResourceType("test"): convert_task}, + { + "seed_parent_seed": DbtSeedLocalOperator, + "parent.run": DbtRunLocalOperator, + "parent.test": EmptyOperator, + "child_run": DbtRunLocalOperator, + "child2_v2_run": DbtRunLocalOperator, + }, + ), + ( + TestBehavior.AFTER_EACH, + {DbtResourceType("model"): convert_task}, + { + "seed_parent_seed": DbtSeedLocalOperator, + "parent.run": EmptyOperator, + "parent.test": DbtTestLocalOperator, + "child_run": EmptyOperator, + "child2_v2_run": EmptyOperator, + }, + ), + ( + TestBehavior.AFTER_ALL, + {DbtResourceType("test"): convert_task}, + { + "seed_parent_seed": DbtSeedLocalOperator, + "parent_run": DbtRunLocalOperator, + "astro_shop_test": EmptyOperator, + "child_run": DbtRunLocalOperator, + "child2_v2_run": DbtRunLocalOperator, + }, + ), + ( + TestBehavior.AFTER_ALL, + {DbtResourceType("model"): convert_task}, + { + "seed_parent_seed": DbtSeedLocalOperator, + "parent_run": EmptyOperator, + "astro_shop_test": DbtTestLocalOperator, + "child_run": EmptyOperator, + "child2_v2_run": EmptyOperator, + }, + ), + ( + TestBehavior.BUILD, + {DbtResourceType("test"): convert_task}, + { + "seed_parent_seed_build": DbtBuildLocalOperator, + "parent_model_build": DbtBuildLocalOperator, + "child_model_build": DbtBuildLocalOperator, + "child2_v2_model_build": DbtBuildLocalOperator, + }, + ), + ( + TestBehavior.BUILD, + {DbtResourceType("model"): convert_task}, + { + "seed_parent_seed_build": DbtBuildLocalOperator, + "parent_model_build": EmptyOperator, + "child_model_build": EmptyOperator, + "child2_v2_model_build": EmptyOperator, + }, + ), + ( + TestBehavior.NONE, + {DbtResourceType("test"): convert_task}, + { + "seed_parent_seed": DbtSeedLocalOperator, + "parent_run": DbtRunLocalOperator, + "child_run": DbtRunLocalOperator, + "child2_v2_run": DbtRunLocalOperator, + }, + ), + ( + TestBehavior.NONE, + {DbtResourceType("model"): convert_task}, + { + "seed_parent_seed": DbtSeedLocalOperator, + "parent_run": EmptyOperator, + "child_run": EmptyOperator, + "child2_v2_run": EmptyOperator, + }, + ), + ], +) +def test_build_airflow_graph_with_node_convert(test_behavior, node_converters, expected_task_types): + """ + Tests node converter logic for different test behaviors. + Seed, Model, Snapshot, and Source should work fairly similarly in all situations, + so we'll choose just one of those DBT resource types (Model) + as well as Tests which behave very differently. + """ + + with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: + task_args = { + "project_dir": SAMPLE_PROJ_PATH, + "conn_id": "fake_conn", + "profile_config": ProfileConfig( + profile_name="default", + target_name="default", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="fake_conn", + profile_args={"schema": "public"}, + ), + ), + } + build_airflow_graph( + nodes=sample_nodes, + dag=dag, + execution_mode=ExecutionMode.LOCAL, + test_indirect_selection=TestIndirectSelection.EAGER, + task_args=task_args, + render_config=RenderConfig( + test_behavior=test_behavior, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + node_converters=node_converters, + ), + dbt_project_name="astro_shop", + ) + + assert len(dag.task_dict) == len(expected_task_types) + for id, task in dag.task_dict.items(): + assert isinstance(task, expected_task_types[id]) From b8b5cbe2cf6971fe89a8e300c3d2dae8a136699a Mon Sep 17 00:00:00 2001 From: Anya Conti Date: Thu, 8 May 2025 18:26:09 -0400 Subject: [PATCH 02/23] Setting task type --- cosmos/airflow/graph.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index c4ee51fd05..4617e96794 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -368,6 +368,7 @@ def generate_or_convert_task( - If no, creates the task as expected with the supplied task_meta Returns the created task """ + task: BaseOperator conversion_function = node_converters.get(resource_type, create_airflow_task) if conversion_function != create_airflow_task: From 412dca8acf51b39753fe7293ce2965b1bb0c5efb Mon Sep 17 00:00:00 2001 From: Anya Conti Date: Fri, 16 May 2025 16:36:35 -0400 Subject: [PATCH 03/23] Fixing test coverage --- cosmos/airflow/graph.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 4617e96794..0fde020486 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -395,8 +395,6 @@ def generate_or_convert_task( ) if task is not None: logger.debug(f"Conversion of node <{node.unique_id}> task <{task_id}> was successful!") - else: - logger.warning(f"Conversion of node <{node.unique_id}> task <{task_id}> failed") else: task = create_airflow_task(task_meta, dag, task_group) return task @@ -490,13 +488,8 @@ def generate_task_or_group( normalize_task_id=normalize_task_id, detached_from_parent=detached_from_parent, ) - if task is not None and test_task is not None: - task >> test_task - task_or_group = model_task_group - elif task is not None: - task_or_group = task - else: - task_or_group = test_task + task >> test_task + task_or_group = model_task_group else: task_or_group = generate_or_convert_task( node_converters=node_converters, From 71d7bd5dccb0f55575835a1778dec3a02b0f742b Mon Sep 17 00:00:00 2001 From: Anya Conti Date: Mon, 29 Sep 2025 17:26:53 -0400 Subject: [PATCH 04/23] Allows the ability to set if node_conversion happens at the task or task group level --- cosmos/airflow/graph.py | 26 ++++++++++++++++---------- cosmos/config.py | 2 ++ docs/configuration/render-config.rst | 2 +- tests/airflow/test_graph.py | 10 ++++++++++ 4 files changed, 29 insertions(+), 11 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 0324df68f0..c16f76a276 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -390,9 +390,7 @@ def is_detached_test(node: DbtNode) -> bool: def generate_or_convert_task( - node_converters: dict[DbtResourceType, Callable[..., Any]], task_meta: TaskMetadata, - dbt_project_name: str, dag: DAG, task_group: TaskGroup | None, node: DbtNode, @@ -403,6 +401,8 @@ def generate_or_convert_task( source_rendering_behavior: SourceRenderingBehavior, test_indirect_selection: TestIndirectSelection, on_warning_callback: Callable[..., Any] | None, + dbt_project_name: str | None = None, + node_converters: dict[DbtResourceType, Callable[..., Any]] = {}, normalize_task_id: Callable[..., Any] | None = None, detached_from_parent: dict[str, list[DbtNode]] | None = None, **kwargs: Any, @@ -415,8 +415,8 @@ def generate_or_convert_task( """ task: BaseOperator - conversion_function = node_converters.get(resource_type, create_airflow_task) - if conversion_function != create_airflow_task: + conversion_function = node_converters.get(resource_type, None) + if conversion_function is not None: task_id = task_meta.id logger.warning( "The `node_converters` attribute is an experimental feature. " @@ -454,23 +454,26 @@ def generate_task_or_group( test_behavior: TestBehavior, source_rendering_behavior: SourceRenderingBehavior, test_indirect_selection: TestIndirectSelection, - dbt_project_name: str, - node_converters: dict[DbtResourceType, Callable[..., Any]], on_warning_callback: Callable[..., Any] | None, + dbt_project_name: str | None = None, + node_converters: dict[DbtResourceType, Callable[..., Any]] = {}, normalize_task_id: Callable[..., Any] | None = None, normalize_task_display_name: Callable[..., Any] | None = None, detached_from_parent: dict[str, list[DbtNode]] | None = None, enable_owner_inheritance: bool | None = None, + node_conversion_by_task_group: bool | None = None, **kwargs: Any, ) -> BaseOperator | TaskGroup | None: task_or_group: BaseOperator | TaskGroup | None = None detached_from_parent = detached_from_parent or {} + node_converters = node_converters or {} use_task_group = ( node.resource_type in TESTABLE_DBT_RESOURCES and test_behavior == TestBehavior.AFTER_EACH and node.has_test is True ) + convert_entire_task_group = node_conversion_by_task_group and node.resource_type in node_converters task_meta = create_task_metadata( node=node, @@ -492,7 +495,7 @@ def generate_task_or_group( # The exception are the test nodes, since it would be too slow to run test tasks individually. # If test_behaviour=="after_each", each model task will be bundled with a test task, using TaskGroup if task_meta and not node.resource_type == DbtResourceType.TEST: - if use_task_group: + if use_task_group and (not convert_entire_task_group): with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group: task = generate_or_convert_task( node_converters=node_converters, @@ -698,7 +701,7 @@ def _add_teardown_task( tasks_map[DBT_TEARDOWN_ASYNC_TASK_ID] = teardown_airflow_task -def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astronomer-cosmos/issues/1943 +def build_airflow_graph( nodes: dict[str, DbtNode], dag: DAG, # Airflow-specific - parent DAG where to associate tasks and (optional) task groups execution_mode: ExecutionMode, # Cosmos-specific - decide what which class to use @@ -741,7 +744,7 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro normalize_task_display_name = render_config.normalize_task_display_name enable_owner_inheritance = render_config.enable_owner_inheritance tasks_map: dict[str, Union[TaskGroup, BaseOperator]] = {} - task_or_group: TaskGroup | BaseOperator + task_or_group: TaskGroup | BaseOperator | None # Identify test nodes that should be run detached from the associated dbt resource nodes because they # have multiple parents @@ -754,7 +757,7 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro virtualenv_dir = task_args.pop("virtualenv_dir", None) for node_id, node in nodes.items(): - task_or_group = generate_task_or_group( # type: ignore + task_or_group = generate_task_or_group( dag=dag, task_group=task_group, node=node, @@ -770,6 +773,7 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro normalize_task_display_name=normalize_task_display_name, detached_from_parent=detached_from_parent, enable_owner_inheritance=enable_owner_inheritance, + node_conversion_by_task_group=render_config.node_conversion_by_task_group, ) if task_or_group is not None: tasks_map[node_id] = task_or_group @@ -802,6 +806,7 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro on_warning_callback=on_warning_callback, normalize_task_id=normalize_task_id, detached_from_parent=detached_from_parent, + node_conversion_by_task_group=render_config.node_conversion_by_task_group, ) leaves_ids = calculate_leaves(tasks_ids=list(tasks_map.keys()), nodes=nodes) for leaf_node_id in leaves_ids: @@ -836,6 +841,7 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro on_warning_callback=on_warning_callback, normalize_task_id=normalize_task_id, detached_from_parent=detached_from_parent, + node_conversion_by_task_group=render_config.node_conversion_by_task_group, ) tasks_map[node_id] = test_task diff --git a/cosmos/config.py b/cosmos/config.py index e1a4a0c353..f16a297ed3 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -68,6 +68,7 @@ class RenderConfig: :param normalize_task_display_name: A callable that takes a dbt node as input and returns the task display name. This allows users to assign a custom task display name separate from the node ID. :param should_detach_multiple_parents_tests: A boolean that allows users to decide whether to run tests with multiple parent dependencies in separate tasks. :param enable_owner_inheritance: A boolean that allows users to enable the owner inheritance from dbt models to airflow tasks. Defaults to True. + :param node_conversion_by_task_group: A boolean that allows users to do node conversion at the task group level instead of task level. Defaults to True. """ emit_datasets: bool = True @@ -91,6 +92,7 @@ class RenderConfig: normalize_task_display_name: Callable[..., Any] | None = None should_detach_multiple_parents_tests: bool = False enable_owner_inheritance: bool | None = True + node_conversion_by_task_group: bool | None = True def __post_init__(self, dbt_project_path: str | Path | None) -> None: if self.env_vars: diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 26af118fe0..1284edbd7c 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -26,7 +26,7 @@ The ``RenderConfig`` class takes the following arguments: - ``normalize_task_display_name``: This function allows users to set a custom user-defined function to alter the display name independently of the model name. This way, the task_id can be preserved while the model display name is modified. - ``should_detach_multiple_parents_tests``: A boolean to control if tests that depend on multiple parents should be run as standalone tasks. See `Parsing Methods `_ for more information. - ``enable_owner_inheritance``: (introduced in 1.10.2) A boolean to control if dbt owners should be imported as part of the airflow DAG owners. Defaults to True. - +- ``node_conversion_by_task_group``: A boolean to control if node_converters are used at the task group level (ex. converting models with test_behavior=AFTER_EACH means the entire task group is converted including the run task and the test task), or the individual task level (gives more granularity for converting just the run tasks or just the test tasks). Defaults to True. How to run dbt ls (invocation mode) ----------------------------------- diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 1c62dea723..84f76a8c8b 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -1158,6 +1158,15 @@ def test_add_teardown_task_raises_error_without_async_py_requirements(): _add_teardown_task(sample_dag, ExecutionMode.AIRFLOW_ASYNC, task_args, sample_tasks_map, None, None) +@pytest.mark.parametrize( + "enable_owner_inheritance,node_owner,expected_owner", + [ + (True, "dbt-owner", "dbt-owner"), # Default behavior - inherit owner + (False, "dbt-owner", ""), # Disable inheritance - empty string + (True, "", ""), # No owner to inherit - empty string + (False, "", ""), # No owner to inherit, disable inheritance - empty string + ], +) def test_create_task_metadata_disable_owner_inheritance(enable_owner_inheritance, node_owner, expected_owner): """Test that enable_owner_inheritance parameter works correctly in create_task_metadata.""" node = DbtNode( @@ -1543,6 +1552,7 @@ def test_build_airflow_graph_with_node_convert(test_behavior, node_converters, e test_behavior=test_behavior, source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, node_converters=node_converters, + node_conversion_by_task_group=False, ), dbt_project_name="astro_shop", ) From 787b24f55f54aecd3e938a23edb7418174687861 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 28 Oct 2025 14:11:07 +0000 Subject: [PATCH 05/23] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/airflow/test_graph.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 3246b193a5..cc0e3a19cd 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -12,13 +12,13 @@ try: # Airflow 3.1 onwards + from airflow.providers.standard.operators.empty import EmptyOperator from airflow.sdk import TaskGroup from airflow.sdk.definitions._internal.abstractoperator import DEFAULT_OWNER - from airflow.providers.standard.operators.empty import EmptyOperator except ImportError: - from airflow.utils.task_group import TaskGroup from airflow.models.abstractoperator import DEFAULT_OWNER from airflow.operators.empty import EmptyOperator + from airflow.utils.task_group import TaskGroup from packaging import version From 015239779d5e2c5f25e862561ce831514258ece3 Mon Sep 17 00:00:00 2001 From: Anya Conti Date: Tue, 28 Oct 2025 10:24:38 -0400 Subject: [PATCH 06/23] Fixing duplicate import from merging in main --- tests/airflow/test_graph.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index cc0e3a19cd..1b48cbea56 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -7,7 +7,6 @@ from airflow import __version__ as airflow_version from airflow.models import DAG -from cosmos import DbtTestLocalOperator from cosmos.operators.watcher import DbtTestWatcherOperator try: From cd20bff84b049a7a4a8fedc6da7700a52dc39b2e Mon Sep 17 00:00:00 2001 From: Anya Conti Date: Tue, 28 Oct 2025 13:36:13 -0400 Subject: [PATCH 07/23] Clean up extra parameter --- cosmos/airflow/graph.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index db6ea25c0f..6a9c14a390 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -924,7 +924,6 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro on_warning_callback=on_warning_callback, normalize_task_id=normalize_task_id, detached_from_parent=detached_from_parent, - node_conversion_by_task_group=render_config.node_conversion_by_task_group, ) leaves_ids = calculate_leaves(tasks_ids=list(tasks_map.keys()), nodes=nodes) for leaf_node_id in leaves_ids: @@ -959,7 +958,6 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro on_warning_callback=on_warning_callback, normalize_task_id=normalize_task_id, detached_from_parent=detached_from_parent, - node_conversion_by_task_group=render_config.node_conversion_by_task_group, ) tasks_map[node_id] = test_task From ba289e790cdcf02d12e2363937ebcf0dd768e573 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 4 Nov 2025 10:56:45 +0000 Subject: [PATCH 08/23] Move converter docs closer together --- docs/configuration/render-config.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 5399f4cb85..033d2d7604 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -15,6 +15,7 @@ The ``RenderConfig`` class takes the following arguments: - ``selector``: (new in v1.3) name of a dbt YAML selector to use for DAG parsing. Only supported when using ``load_method=LoadMode.DBT_LS``. See `Selecting & Excluding `_ for more information. - ``dbt_deps``: (deprecated in v1.9, use ``ProjectConfig.install_dbt_deps`` onwards) A Boolean to run dbt deps when using dbt ls for dag parsing. Default True - ``node_converters``: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. Find more information below. +- ``node_conversion_by_task_group``: A boolean to control if node_converters are used at the task group level (ex. converting models with test_behavior=AFTER_EACH means the entire task group is converted including the run task and the test task), or the individual task level (gives more granularity for converting just the run tasks or just the test tasks). Defaults to True. - ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. - ``dbt_ls_path``: Should be set when using ``load_method=LoadMode.DBT_LS_OUTPUT``. Path of the user-managed output of ``dbt ls``. - ``enable_mock_profile``: When using ``LoadMode.DBT_LS`` with a ``ProfileMapping`` class, by default, Cosmos mocks the values of the profile. Defaults to True. In order to leverage partial parsing, this argument should be set to ``False``. Read `Partial parsing <./partial-parsing.html#profile-configuration.html>`_ for more information. @@ -27,7 +28,7 @@ The ``RenderConfig`` class takes the following arguments: - ``normalize_task_display_name``: This function allows users to set a custom user-defined function to alter the display name independently of the model name. This way, the task_id can be preserved while the model display name is modified. - ``should_detach_multiple_parents_tests``: A boolean to control if tests that depend on multiple parents should be run as standalone tasks. See `Parsing Methods `_ for more information. - ``enable_owner_inheritance``: (introduced in 1.10.2) A boolean to control if dbt owners should be imported as part of the airflow DAG owners. Defaults to True. -- ``node_conversion_by_task_group``: A boolean to control if node_converters are used at the task group level (ex. converting models with test_behavior=AFTER_EACH means the entire task group is converted including the run task and the test task), or the individual task level (gives more granularity for converting just the run tasks or just the test tasks). Defaults to True. + How to run dbt ls (invocation mode) ----------------------------------- From ee4491dff950821713bae9743bbd60874ed62c6a Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 4 Nov 2025 13:04:47 +0000 Subject: [PATCH 09/23] Refactor airflow/graph.py to simplify interfaces since #1759 --- cosmos/airflow/graph.py | 266 +++++++++++++++------------------------- 1 file changed, 101 insertions(+), 165 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 6a9c14a390..34f6032077 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -289,16 +289,11 @@ def create_task_metadata( execution_mode: ExecutionMode, args: dict[str, Any], dbt_dag_task_group_identifier: str, + render_config: RenderConfig, use_task_group: bool = False, - source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, - source_pruning: bool = False, - normalize_task_id: Callable[..., Any] | None = None, - normalize_task_display_name: Callable[..., Any] | None = None, - test_behavior: TestBehavior = TestBehavior.AFTER_ALL, test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER, on_warning_callback: Callable[..., Any] | None = None, detached_from_parent: dict[str, list[DbtNode]] | None = None, - enable_owner_inheritance: bool | None = None, filtered_nodes: dict[str, DbtNode] | None = None, ) -> TaskMetadata | None: """ @@ -316,7 +311,7 @@ def create_task_metadata( :param detached_from_parent: Dictionary that maps node ids and their children tests that should be run detached :returns: The metadata necessary to instantiate the source dbt node as an Airflow task. """ - dbt_resource_to_class = create_dbt_resource_to_class(test_behavior) + dbt_resource_to_class = create_dbt_resource_to_class(render_config.test_behavior) # Make a copy to avoid issues with mutable arguments args = {**args} @@ -329,7 +324,7 @@ def create_task_metadata( } resource_suffix_map = {TestBehavior.BUILD: "build", DbtResourceType.MODEL: "run"} resource_suffix = ( - resource_suffix_map.get(test_behavior) + resource_suffix_map.get(render_config.test_behavior) or resource_suffix_map.get(node.resource_type) or node.resource_type.value ) @@ -340,7 +335,7 @@ def create_task_metadata( # `AIRFLOW__COSMOS__PRE_DBT_FUSION=1`. models_select_key = "models" if settings.pre_dbt_fusion else "select" - if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: + if render_config.test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: args[models_select_key] = f"{node.resource_name}" if test_indirect_selection != TestIndirectSelection.EAGER: args["indirect_selection"] = test_indirect_selection.value @@ -350,8 +345,8 @@ def create_task_metadata( node=node, args=args, use_task_group=use_task_group, - normalize_task_id=normalize_task_id, - normalize_task_display_name=normalize_task_display_name, + normalize_task_id=render_config.normalize_task_id, + normalize_task_display_name=render_config.normalize_task_display_name, resource_suffix=resource_suffix, include_resource_type=True, execution_mode=execution_mode, @@ -360,25 +355,29 @@ def create_task_metadata( args["select"] = f"source:{node.resource_name}" args["on_warning_callback"] = on_warning_callback - if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( - source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS + if (render_config.source_rendering_behavior == SourceRenderingBehavior.NONE) or ( + render_config.source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS and node.has_freshness is False and node.has_test is False ): return None - if source_pruning and filtered_nodes and not _is_source_used_by_filtered_nodes(node, filtered_nodes): + if ( + render_config.source_pruning + and filtered_nodes + and not _is_source_used_by_filtered_nodes(node, filtered_nodes) + ): return None task_id, args = _get_task_id_and_args( node=node, args=args, use_task_group=use_task_group, - normalize_task_id=normalize_task_id, - normalize_task_display_name=normalize_task_display_name, + normalize_task_id=render_config.normalize_task_id, + normalize_task_display_name=render_config.normalize_task_display_name, resource_suffix=r"source", execution_mode=execution_mode, ) - if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL: + if node.has_freshness is False and render_config.source_rendering_behavior == SourceRenderingBehavior.ALL: # render sources without freshness as empty operators # empty operator does not accept custom parameters (e.g., profile_args). recreate the args. if "task_display_name" in args: @@ -392,8 +391,8 @@ def create_task_metadata( node=node, args=args, use_task_group=use_task_group, - normalize_task_id=normalize_task_id, - normalize_task_display_name=normalize_task_display_name, + normalize_task_id=render_config.normalize_task_id, + normalize_task_display_name=render_config.normalize_task_display_name, resource_suffix=resource_suffix, execution_mode=execution_mode, ) @@ -402,7 +401,7 @@ def create_task_metadata( task_owner = node.owner - if not enable_owner_inheritance: + if not render_config.enable_owner_inheritance: task_owner = "" task_metadata = TaskMetadata( @@ -441,15 +440,15 @@ def generate_or_convert_task( task_group: TaskGroup | None, node: DbtNode, resource_type: DbtResourceType, - execution_mode: ExecutionMode, task_args: dict[str, Any], - test_behavior: TestBehavior, - source_rendering_behavior: SourceRenderingBehavior, - test_indirect_selection: TestIndirectSelection, + render_config: RenderConfig, + node_converters: dict[DbtResourceType, Callable[..., Any]], on_warning_callback: Callable[..., Any] | None, + # Properties from ExecutionConfig: + execution_mode: ExecutionMode, + test_indirect_selection: TestIndirectSelection, + # Other arguments relevant to instantiating the task: dbt_project_name: str | None = None, - node_converters: dict[DbtResourceType, Callable[..., Any]] = {}, - normalize_task_id: Callable[..., Any] | None = None, detached_from_parent: dict[str, list[DbtNode]] | None = None, **kwargs: Any, ) -> BaseOperator: @@ -475,11 +474,11 @@ def generate_or_convert_task( dbt_project_name=dbt_project_name, execution_mode=execution_mode, task_args=task_args, - test_behavior=test_behavior, - source_rendering_behavior=source_rendering_behavior, + test_behavior=render_config.test_behavior, + source_rendering_behavior=render_config.source_rendering_behavior, test_indirect_selection=test_indirect_selection, on_warning_callback=on_warning_callback, - normalize_task_id=normalize_task_id, + normalize_task_id=render_config.normalize_task_id, node=node, task_id=task_id, detached_from_parent=detached_from_parent, @@ -495,75 +494,66 @@ def generate_task_or_group( dag: DAG, task_group: TaskGroup | None, node: DbtNode, - execution_mode: ExecutionMode, task_args: dict[str, Any], - test_behavior: TestBehavior, - source_rendering_behavior: SourceRenderingBehavior, + render_config: RenderConfig, + node_converters: dict[DbtResourceType, Callable[..., Any]], + # These two properties come from ExecutionConfig and affect rendering decisions: + execution_mode: ExecutionMode, test_indirect_selection: TestIndirectSelection, + # Other arguments relevant to instantiating that task or task group: dbt_project_name: str | None = None, - node_converters: dict[DbtResourceType, Callable[..., Any]] = {}, - source_pruning: bool = False, on_warning_callback: Callable[..., Any] | None = None, - normalize_task_id: Callable[..., Any] | None = None, - normalize_task_display_name: Callable[..., Any] | None = None, detached_from_parent: dict[str, list[DbtNode]] | None = None, - enable_owner_inheritance: bool | None = None, - node_conversion_by_task_group: bool | None = None, filtered_nodes: dict[str, DbtNode] | None = None, **kwargs: Any, ) -> BaseOperator | TaskGroup | None: + task_or_group: BaseOperator | TaskGroup | None = None detached_from_parent = detached_from_parent or {} - node_converters = node_converters or {} - use_task_group = ( node.resource_type in TESTABLE_DBT_RESOURCES - and test_behavior == TestBehavior.AFTER_EACH + and render_config.test_behavior == TestBehavior.AFTER_EACH and node.has_test is True ) - convert_entire_task_group = node_conversion_by_task_group and node.resource_type in node_converters + convert_entire_task_group = render_config.node_conversion_by_task_group and node.resource_type in node_converters task_meta = create_task_metadata( node=node, + render_config=render_config, execution_mode=execution_mode, args=task_args, dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group), use_task_group=use_task_group, - source_rendering_behavior=source_rendering_behavior, - source_pruning=source_pruning, - normalize_task_id=normalize_task_id, - normalize_task_display_name=normalize_task_display_name, - test_behavior=test_behavior, test_indirect_selection=test_indirect_selection, on_warning_callback=on_warning_callback, detached_from_parent=detached_from_parent, - enable_owner_inheritance=enable_owner_inheritance, filtered_nodes=filtered_nodes, ) + generate_or_convert_task_args = { + "task_meta": task_meta, + "dbt_project_name": dbt_project_name, + "dag": dag, + "task_group": task_group, + "node": node, + "resource_type": node.resource_type, + "task_args": task_args, + "render_config": render_config, + "on_warning_callback": on_warning_callback, + "detached_from_parent": detached_from_parent, + "node_converters": node_converters, + # Properties from ExecutionConfig: + "execution_mode": execution_mode, + "test_indirect_selection": test_indirect_selection, + } + # In most cases, we'll map one DBT node to one Airflow task # The exception are the test nodes, since it would be too slow to run test tasks individually. # If test_behaviour=="after_each", each model task will be bundled with a test task, using TaskGroup if task_meta and not node.resource_type == DbtResourceType.TEST: if use_task_group and (not convert_entire_task_group): with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group: - task = generate_or_convert_task( - node_converters=node_converters, - task_meta=task_meta, - dbt_project_name=dbt_project_name, - dag=dag, - task_group=model_task_group, - node=node, - resource_type=node.resource_type, - execution_mode=execution_mode, - task_args=task_args, - test_behavior=test_behavior, - source_rendering_behavior=source_rendering_behavior, - test_indirect_selection=test_indirect_selection, - on_warning_callback=on_warning_callback, - normalize_task_id=normalize_task_id, - detached_from_parent=detached_from_parent, - ) + task = generate_or_convert_task(**generate_or_convert_task_args) # type: ignore[arg-type] test_meta = create_test_task_metadata( "test", execution_mode, @@ -572,45 +562,18 @@ def generate_task_or_group( node=node, on_warning_callback=on_warning_callback, detached_from_parent=detached_from_parent, - enable_owner_inheritance=enable_owner_inheritance, - ) - test_task = generate_or_convert_task( - node_converters=node_converters, - task_meta=test_meta, - dbt_project_name=dbt_project_name, - dag=dag, - task_group=model_task_group, - node=node, - resource_type=DbtResourceType.TEST, # type: ignore - execution_mode=execution_mode, - task_args=task_args, - test_behavior=test_behavior, - source_rendering_behavior=source_rendering_behavior, - test_indirect_selection=test_indirect_selection, - on_warning_callback=on_warning_callback, - normalize_task_id=normalize_task_id, - detached_from_parent=detached_from_parent, + enable_owner_inheritance=render_config.enable_owner_inheritance, ) + test_task_generate_or_convert_task_args = { + **generate_or_convert_task_args, + "task_meta": test_meta, + "resource_type": DbtResourceType.TEST, # type: ignore + } + test_task = generate_or_convert_task(**test_task_generate_or_convert_task_args) # type: ignore[arg-type] task >> test_task task_or_group = model_task_group else: - task_or_group = generate_or_convert_task( - node_converters=node_converters, - task_meta=task_meta, - dbt_project_name=dbt_project_name, - dag=dag, - task_group=task_group, - node=node, - resource_type=node.resource_type, - execution_mode=execution_mode, - task_args=task_args, - test_behavior=test_behavior, - source_rendering_behavior=source_rendering_behavior, - test_indirect_selection=test_indirect_selection, - on_warning_callback=on_warning_callback, - normalize_task_id=normalize_task_id, - detached_from_parent=detached_from_parent, - ) + task_or_group = generate_or_convert_task(**generate_or_convert_task_args) # type: ignore[arg-type] return task_or_group @@ -852,13 +815,6 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro and “test_results” of type List. :return: Dictionary mapping dbt nodes (node.unique_id to Airflow task) """ - node_converters = render_config.node_converters or {} - test_behavior = render_config.test_behavior - source_rendering_behavior = render_config.source_rendering_behavior - source_pruning = render_config.source_pruning - normalize_task_id = render_config.normalize_task_id - normalize_task_display_name = render_config.normalize_task_display_name - enable_owner_inheritance = render_config.enable_owner_inheritance tasks_map: dict[str, Union[TaskGroup, BaseOperator]] = {} task_or_group: TaskGroup | BaseOperator | None @@ -869,36 +825,38 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro identify_detached_nodes(nodes, render_config, detached_nodes, detached_from_parent) virtualenv_dir = None + if execution_mode == ExecutionMode.AIRFLOW_ASYNC: + # This property is only relevant for the setup task, not the other tasks: virtualenv_dir = task_args.pop("virtualenv_dir", None) for node_id, node in nodes.items(): - task_or_group = generate_task_or_group( - dag=dag, - task_group=task_group, - node=node, - execution_mode=execution_mode, - task_args=task_args, - test_behavior=test_behavior, - source_rendering_behavior=source_rendering_behavior, - source_pruning=source_pruning, - test_indirect_selection=test_indirect_selection, - node_converters=node_converters, - dbt_project_name=dbt_project_name, - on_warning_callback=on_warning_callback, - normalize_task_id=normalize_task_id, - normalize_task_display_name=normalize_task_display_name, - detached_from_parent=detached_from_parent, - enable_owner_inheritance=enable_owner_inheritance, - node_conversion_by_task_group=render_config.node_conversion_by_task_group, - filtered_nodes=nodes, - ) + + task_or_group_args = { + # Arguments to this method: + "dag": dag, + "task_group": task_group, + "node": node, + "task_args": task_args, + "dbt_project_name": dbt_project_name, + "render_config": render_config, + # Properties from ExecutionConfig: + "execution_mode": execution_mode, + "test_indirect_selection": test_indirect_selection, + # Argument to DbtDag or DbtTaskGroup: + "on_warning_callback": on_warning_callback, + # Calculated in this method: + "detached_from_parent": detached_from_parent, + "node_converters": render_config.node_converters or {}, + } + + task_or_group = generate_task_or_group(**task_or_group_args, filtered_nodes=nodes) # type: ignore[arg-type] if task_or_group is not None: tasks_map[node_id] = task_or_group # If test_behaviour=="after_all", there will be one test task, run by the end of the DAG # The end of a DAG is defined by the DAG leaf tasks (tasks which do not have downstream tasks) - if test_behavior == TestBehavior.AFTER_ALL: + if render_config.test_behavior == TestBehavior.AFTER_ALL: test_meta = create_test_task_metadata( f"{dbt_project_name}_test", execution_mode, @@ -906,29 +864,18 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro task_args=task_args, on_warning_callback=on_warning_callback, render_config=render_config, - enable_owner_inheritance=enable_owner_inheritance, - ) - test_task = generate_or_convert_task( - node_converters=node_converters, - task_meta=test_meta, - dbt_project_name=dbt_project_name, - dag=dag, - task_group=task_group, - node=node, - resource_type=DbtResourceType.TEST, # type: ignore - execution_mode=execution_mode, - task_args=task_args, - test_behavior=test_behavior, - source_rendering_behavior=source_rendering_behavior, - test_indirect_selection=test_indirect_selection, - on_warning_callback=on_warning_callback, - normalize_task_id=normalize_task_id, - detached_from_parent=detached_from_parent, + enable_owner_inheritance=render_config.enable_owner_inheritance, ) + test_task_args = { + **task_or_group_args, + "task_meta": test_meta, + "resource_type": DbtResourceType.TEST, # type: ignore + } + test_task = generate_or_convert_task(**test_task_args) # type: ignore[arg-type] leaves_ids = calculate_leaves(tasks_ids=list(tasks_map.keys()), nodes=nodes) for leaf_node_id in leaves_ids: tasks_map[leaf_node_id] >> test_task - elif test_behavior in (TestBehavior.BUILD, TestBehavior.AFTER_EACH): + elif render_config.test_behavior in (TestBehavior.BUILD, TestBehavior.AFTER_EACH): # Handle detached test nodes for node_id, node in detached_nodes.items(): datached_node_name = calculate_detached_node_name(node) @@ -940,25 +887,14 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro on_warning_callback=on_warning_callback, render_config=render_config, node=node, - enable_owner_inheritance=enable_owner_inheritance, - ) - test_task = generate_or_convert_task( - node_converters=node_converters, - task_meta=test_meta, - dbt_project_name=dbt_project_name, - dag=dag, - task_group=task_group, - node=node, - resource_type=node.resource_type, - execution_mode=execution_mode, - task_args=task_args, - test_behavior=test_behavior, - source_rendering_behavior=source_rendering_behavior, - test_indirect_selection=test_indirect_selection, - on_warning_callback=on_warning_callback, - normalize_task_id=normalize_task_id, - detached_from_parent=detached_from_parent, + enable_owner_inheritance=render_config.enable_owner_inheritance, ) + test_task_args = { + **task_or_group_args, + "task_meta": test_meta, + "resource_type": node.resource_type, # type: ignore + } + test_task = generate_or_convert_task(**test_task_args) # type: ignore[arg-type] tasks_map[node_id] = test_task create_airflow_task_dependencies(nodes, tasks_map) From 9088cb220c4a0b893a2996a7078e302a447941d0 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 4 Nov 2025 13:12:50 +0000 Subject: [PATCH 10/23] Move node_conversion arguments together --- cosmos/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/config.py b/cosmos/config.py index 7049c450c3..8945c502d8 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -88,6 +88,7 @@ class RenderConfig: selector: str | None = None dbt_deps: bool | None = None node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None + node_conversion_by_task_group: bool | None = True dbt_executable_path: str | Path = get_system_dbt() env_vars: dict[str, str] | None = None dbt_project_path: InitVar[str | Path | None] = None @@ -101,7 +102,6 @@ class RenderConfig: normalize_task_display_name: Callable[..., Any] | None = None should_detach_multiple_parents_tests: bool = False enable_owner_inheritance: bool | None = True - node_conversion_by_task_group: bool | None = True def __post_init__(self, dbt_project_path: str | Path | None) -> None: if self.env_vars: From 9d0826f6f403a3bd890ac233f5cd4d2bc45f7ff7 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 4 Nov 2025 13:32:05 +0000 Subject: [PATCH 11/23] Fix unittests --- cosmos/airflow/graph.py | 2 +- tests/airflow/test_graph.py | 41 +++++++++++------- tests/dbt/test_pruning.py | 83 ++++++++++++++++++++++++------------- 3 files changed, 83 insertions(+), 43 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 34f6032077..5815127fe9 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -289,7 +289,7 @@ def create_task_metadata( execution_mode: ExecutionMode, args: dict[str, Any], dbt_dag_task_group_identifier: str, - render_config: RenderConfig, + render_config: RenderConfig = RenderConfig(), use_task_group: bool = False, test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER, on_warning_callback: Callable[..., Any] | None = None, diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 1b48cbea56..1cf7bc0a8a 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -231,9 +231,11 @@ def test_create_task_group_for_after_each_supported_nodes(node_type: DbtResource }, dbt_project_name="astro_shop", node_converters={}, - test_behavior=TestBehavior.AFTER_EACH, + render_config=RenderConfig( + test_behavior=TestBehavior.AFTER_EACH, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + ), on_warning_callback=None, - source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, ) assert isinstance(output, TaskGroup) assert list(output.children.keys()) == [f"dbt_node.{task_suffix}", "dbt_node.test"] @@ -615,7 +617,9 @@ def test_create_task_metadata_source_with_rendering_options( metadata = create_task_metadata( child_node, execution_mode=ExecutionMode.LOCAL, - source_rendering_behavior=source_rendering_behavior, + render_config=RenderConfig( + source_rendering_behavior=source_rendering_behavior, + ), args={}, dbt_dag_task_group_identifier="", ) @@ -922,10 +926,12 @@ def test_create_task_metadata_normalize_task_id( args=args, dbt_dag_task_group_identifier="", use_task_group=use_task_group, - normalize_task_id=normalize_task_id, - normalize_task_display_name=normalize_task_display_name, - source_rendering_behavior=SourceRenderingBehavior.ALL, - test_behavior=test_behavior, + render_config=RenderConfig( + normalize_task_id=normalize_task_id, + normalize_task_display_name=normalize_task_display_name, + source_rendering_behavior=SourceRenderingBehavior.ALL, + test_behavior=test_behavior, + ), ) assert metadata.id == expected_node_id if expected_display_name: @@ -1105,10 +1111,12 @@ def test_owner(dbt_extra_config, expected_owner): }, dbt_project_name="astro_shop", node_converters={}, - test_behavior=TestBehavior.AFTER_EACH, + render_config=RenderConfig( + test_behavior=TestBehavior.AFTER_EACH, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + enable_owner_inheritance=True, + ), on_warning_callback=None, - source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, - enable_owner_inheritance=True, ) assert len(output.leaves) == 1 @@ -1232,7 +1240,9 @@ def test_create_task_metadata_disable_owner_inheritance(enable_owner_inheritance execution_mode=ExecutionMode.LOCAL, args={"project_dir": SAMPLE_PROJ_PATH}, dbt_dag_task_group_identifier="test_dag", - enable_owner_inheritance=enable_owner_inheritance, + render_config=RenderConfig( + enable_owner_inheritance=enable_owner_inheritance, + ), ) assert task_metadata is not None @@ -1322,11 +1332,14 @@ def test_generate_task_or_group_disable_owner_inheritance(enable_owner_inheritan ), ), }, - test_behavior=TestBehavior.NONE, - source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + render_config=RenderConfig( + test_behavior=TestBehavior.NONE, + source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR, + enable_owner_inheritance=enable_owner_inheritance, + ), test_indirect_selection=TestIndirectSelection.EAGER, on_warning_callback=None, - enable_owner_inheritance=enable_owner_inheritance, + node_converters={}, ) assert task_or_group is not None diff --git a/tests/dbt/test_pruning.py b/tests/dbt/test_pruning.py index 9d34a0e3ea..6b1a568ad9 100644 --- a/tests/dbt/test_pruning.py +++ b/tests/dbt/test_pruning.py @@ -13,7 +13,7 @@ create_task_metadata, generate_task_or_group, ) -from cosmos.config import ProfileConfig +from cosmos.config import ProfileConfig, RenderConfig from cosmos.constants import ( DbtResourceType, ExecutionMode, @@ -224,9 +224,11 @@ def test_create_task_metadata_source_pruning_disabled(self): node=self.source_with_downstream, execution_mode=ExecutionMode.LOCAL, args=self.task_args, - dbt_dag_task_group_identifier="test", - source_rendering_behavior=SourceRenderingBehavior.ALL, - source_pruning=False, # Disabled + dbt_dag_task_group_identifier="tes t", + render_config=RenderConfig( + source_rendering_behavior=SourceRenderingBehavior.ALL, + source_pruning=False, # Disabled + ), filtered_nodes=self.filtered_nodes_with_model, ) @@ -235,8 +237,10 @@ def test_create_task_metadata_source_pruning_disabled(self): execution_mode=ExecutionMode.LOCAL, args=self.task_args, dbt_dag_task_group_identifier="test", - source_rendering_behavior=SourceRenderingBehavior.ALL, - source_pruning=False, # Disabled + render_config=RenderConfig( + source_rendering_behavior=SourceRenderingBehavior.ALL, + source_pruning=False, # Disabled + ), filtered_nodes=self.filtered_nodes_empty, ) @@ -254,8 +258,10 @@ def test_create_task_metadata_source_pruning_enabled(self): execution_mode=ExecutionMode.LOCAL, args=self.task_args, dbt_dag_task_group_identifier="test", - source_rendering_behavior=SourceRenderingBehavior.ALL, - source_pruning=True, # Enabled + render_config=RenderConfig( + source_rendering_behavior=SourceRenderingBehavior.ALL, + source_pruning=True, # Enabled + ), filtered_nodes=self.filtered_nodes_with_model, ) @@ -265,8 +271,10 @@ def test_create_task_metadata_source_pruning_enabled(self): execution_mode=ExecutionMode.LOCAL, args=self.task_args, dbt_dag_task_group_identifier="test", - source_rendering_behavior=SourceRenderingBehavior.ALL, - source_pruning=True, # Enabled + render_config=RenderConfig( + source_rendering_behavior=SourceRenderingBehavior.ALL, + source_pruning=True, # Enabled + ), filtered_nodes=self.filtered_nodes_with_model, ) @@ -282,8 +290,10 @@ def test_create_task_metadata_source_pruning_edge_cases(self): execution_mode=ExecutionMode.LOCAL, args=self.task_args, dbt_dag_task_group_identifier="test", - source_rendering_behavior=SourceRenderingBehavior.ALL, - source_pruning=True, + render_config=RenderConfig( + source_rendering_behavior=SourceRenderingBehavior.ALL, + source_pruning=True, + ), filtered_nodes=None, # None means skip pruning check ) @@ -293,8 +303,10 @@ def test_create_task_metadata_source_pruning_edge_cases(self): execution_mode=ExecutionMode.LOCAL, args=self.task_args, dbt_dag_task_group_identifier="test", - source_rendering_behavior=SourceRenderingBehavior.ALL, - source_pruning=True, + render_config=RenderConfig( + source_rendering_behavior=SourceRenderingBehavior.ALL, + source_pruning=True, + ), filtered_nodes={}, # Empty dict means skip pruning check ) @@ -311,8 +323,10 @@ def test_create_task_metadata_source_pruning_edge_cases(self): execution_mode=ExecutionMode.LOCAL, args=self.task_args, dbt_dag_task_group_identifier="test", - source_rendering_behavior=SourceRenderingBehavior.ALL, - source_pruning=True, + render_config=RenderConfig( + source_rendering_behavior=SourceRenderingBehavior.ALL, + source_pruning=True, + ), filtered_nodes={unrelated_model.unique_id: unrelated_model}, # Non-empty but doesn't use this source ) @@ -333,11 +347,14 @@ def test_generate_task_or_group_source_pruning(self): node=self.source_with_downstream, execution_mode=ExecutionMode.LOCAL, task_args=self.task_args, - test_behavior=TestBehavior.NONE, - source_rendering_behavior=SourceRenderingBehavior.ALL, - source_pruning=True, + render_config=RenderConfig( + test_behavior=TestBehavior.NONE, + source_rendering_behavior=SourceRenderingBehavior.ALL, + source_pruning=True, + ), test_indirect_selection=TestIndirectSelection.EAGER, filtered_nodes=self.filtered_nodes_with_model, + node_converters={}, ) # Orphaned source should NOT generate a task @@ -348,10 +365,13 @@ def test_generate_task_or_group_source_pruning(self): execution_mode=ExecutionMode.LOCAL, task_args=self.task_args, test_behavior=TestBehavior.NONE, - source_rendering_behavior=SourceRenderingBehavior.ALL, - source_pruning=True, + render_config=RenderConfig( + source_rendering_behavior=SourceRenderingBehavior.ALL, + source_pruning=True, + ), test_indirect_selection=TestIndirectSelection.EAGER, filtered_nodes=self.filtered_nodes_with_model, + node_converters={}, ) assert task_with_downstream is not None @@ -375,8 +395,10 @@ def test_source_pruning_with_different_source_rendering_behaviors(self): execution_mode=ExecutionMode.LOCAL, args=self.task_args, dbt_dag_task_group_identifier="test", - source_rendering_behavior=SourceRenderingBehavior.NONE, - source_pruning=True, + render_config=RenderConfig( + source_rendering_behavior=SourceRenderingBehavior.NONE, + source_pruning=True, + ), filtered_nodes=self.filtered_nodes_with_model, ) @@ -386,8 +408,10 @@ def test_source_pruning_with_different_source_rendering_behaviors(self): execution_mode=ExecutionMode.LOCAL, args=self.task_args, dbt_dag_task_group_identifier="test", - source_rendering_behavior=SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS, - source_pruning=True, + render_config=RenderConfig( + source_rendering_behavior=SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS, + source_pruning=True, + ), filtered_nodes=self.filtered_nodes_with_model, ) @@ -504,11 +528,14 @@ def test_source_pruning_parameter_defaults(self): node=source, execution_mode=ExecutionMode.LOCAL, task_args=task_args, - test_behavior=TestBehavior.NONE, - source_rendering_behavior=SourceRenderingBehavior.ALL, + render_config=RenderConfig( + test_behavior=TestBehavior.NONE, + source_rendering_behavior=SourceRenderingBehavior.ALL, + # source_pruning not specified - should default to False + ), test_indirect_selection=TestIndirectSelection.EAGER, - # source_pruning not specified - should default to False # filtered_nodes not specified - should default to None + node_converters={}, ) # Should create a task because pruning is disabled by default From 7dd880d9b2412193640a5e65ddb50dfb591c20c5 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 4 Nov 2025 13:58:37 +0000 Subject: [PATCH 12/23] Fix integration test --- cosmos/airflow/graph.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 5815127fe9..f6e3206784 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -553,7 +553,12 @@ def generate_task_or_group( if task_meta and not node.resource_type == DbtResourceType.TEST: if use_task_group and (not convert_entire_task_group): with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group: - task = generate_or_convert_task(**generate_or_convert_task_args) # type: ignore[arg-type] + task = generate_or_convert_task( + **{ # type: ignore[arg-type] + **generate_or_convert_task_args, + "task_group": model_task_group, + }, + ) test_meta = create_test_task_metadata( "test", execution_mode, @@ -566,6 +571,7 @@ def generate_task_or_group( ) test_task_generate_or_convert_task_args = { **generate_or_convert_task_args, + "task_group": model_task_group, "task_meta": test_meta, "resource_type": DbtResourceType.TEST, # type: ignore } From 784342aa31e2f2b7bace70e6f458317c81494d96 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 4 Nov 2025 14:07:17 +0000 Subject: [PATCH 13/23] Improve docs --- docs/configuration/render-config.rst | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 033d2d7604..be9c20ff52 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -51,8 +51,10 @@ Users may opt to use ``InvocationMode.SUBPROCESS`` when they have multiple Pytho and do not want Cosmos to use the dbt version installed in the same Python Virtualenv as Airflow to parse the DAG. -Customizing how nodes are rendered (experimental) -------------------------------------------------- +Customizing how nodes are rendered +---------------------------------- + +This feature is available in Cosmos 1.2.0 onwards. There are circumstances when choosing specific Airflow operators to represent a dbt node is helpful. An example could be to use an S3 sensor to represent dbt sources or to create custom operators to handle exposures. @@ -66,3 +68,7 @@ The following example illustrates how it is possible to tell Cosmos how to conve :end-before: [END custom_dbt_nodes] When defining the mapping for a new type that is not part of Cosmos' ``DbtResourceType`` enumeration, users should use the syntax ``DbtResourceType("new-node-type")`` as opposed to ``DbtResourceType.EXISTING_TYPE``. It will dynamically add the new type to the enumeration ``DbtResourceType`` so that Cosmos can parse these dbt nodes and convert them into the Airflow DAG. + +In Cosmos 1.12.0, this feature was further improved by adding the ``RenderConfig.node_conversion_by_task_group`` parameter. +This parameter allows users to control if node_converters are used at the task group level (ex. converting models with test_behavior=AFTER_EACH means the entire task group is converted including the run task and the test task), +or the individual task level (gives more granularity for converting just the run tasks or just the test tasks). Defaults to True. From b3e3435b5c4cf01a807a352cb83b2b982cf41b64 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 4 Nov 2025 14:19:08 +0000 Subject: [PATCH 14/23] Update the docs --- CHANGELOG.rst | 24 ++++++++++++++++++++++++ cosmos/__init__.py | 2 +- docs/configuration/render-config.rst | 4 ++-- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f0e36e99ae..554b954374 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,30 @@ Changelog ========= +1.12.0 (2025-11-04) +------------------- + +Breaking changes + +* The follow functions are expected to be used internally only to Cosmos, so we expect these won't impact end-users, but we are documenting the changes just in case: + - ``generate_task_or_group`` receives ``render_config`` instead of its individual configurations, such as test_behavior, source_rendering_behavior and enable_owner_inheritance + - ``create_task_metadata`` receives ``render_config`` instead of its individual configurations, such as test_behavior, source_rendering_behavior and enable_owner_inheritance + - ``create_task_metadata`` expects the ``node_converters`` argument + +Features + +* Support applying ``node_converter`` at a task level instead of task group level by @anyapriya in #1759 + +Enhancements + +* Remove usage of contextmanager in plugins for accessing connections in Airflow >= 3.1.2 by @pankajkoti in #2073 +* Refactor ``airflow/graph.py`` to simplify code-base by @tatiana in #2080 + +Others + +* Fix broken CI due to fastapi incompatibility with cadwyn for Airflow 3 by @pankajkoti in #2076 + + 1.11.0 (2025-10-29) --------------------- diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 321d8791cf..3422485ca5 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -9,7 +9,7 @@ from cosmos import settings -__version__ = "1.11.0" +__version__ = "1.12.0a1" if not settings.enable_memory_optimised_imports: from cosmos.airflow.dag import DbtDag diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index be9c20ff52..e1e598d20d 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -15,7 +15,7 @@ The ``RenderConfig`` class takes the following arguments: - ``selector``: (new in v1.3) name of a dbt YAML selector to use for DAG parsing. Only supported when using ``load_method=LoadMode.DBT_LS``. See `Selecting & Excluding `_ for more information. - ``dbt_deps``: (deprecated in v1.9, use ``ProjectConfig.install_dbt_deps`` onwards) A Boolean to run dbt deps when using dbt ls for dag parsing. Default True - ``node_converters``: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. Find more information below. -- ``node_conversion_by_task_group``: A boolean to control if node_converters are used at the task group level (ex. converting models with test_behavior=AFTER_EACH means the entire task group is converted including the run task and the test task), or the individual task level (gives more granularity for converting just the run tasks or just the test tasks). Defaults to True. +- ``node_conversion_by_task_group``: (new in v1.12.0) A boolean to control if node_converters are used at the task group level (ex. converting models with test_behavior=AFTER_EACH means the entire task group is converted including the run task and the test task), or the individual task level (gives more granularity for converting just the run tasks or just the test tasks). Defaults to True. - ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. - ``dbt_ls_path``: Should be set when using ``load_method=LoadMode.DBT_LS_OUTPUT``. Path of the user-managed output of ``dbt ls``. - ``enable_mock_profile``: When using ``LoadMode.DBT_LS`` with a ``ProfileMapping`` class, by default, Cosmos mocks the values of the profile. Defaults to True. In order to leverage partial parsing, this argument should be set to ``False``. Read `Partial parsing <./partial-parsing.html#profile-configuration.html>`_ for more information. @@ -54,7 +54,7 @@ and do not want Cosmos to use the dbt version installed in the same Python Virtu Customizing how nodes are rendered ---------------------------------- -This feature is available in Cosmos 1.2.0 onwards. +.. versionadded:: 1.2.0 There are circumstances when choosing specific Airflow operators to represent a dbt node is helpful. An example could be to use an S3 sensor to represent dbt sources or to create custom operators to handle exposures. From ea2a250d58ccbdb265a689f729691522f1411cd8 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 4 Nov 2025 14:23:39 +0000 Subject: [PATCH 15/23] improve changelog --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 554b954374..7d2b1846a3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -6,7 +6,7 @@ Changelog Breaking changes -* The follow functions are expected to be used internally only to Cosmos, so we expect these won't impact end-users, but we are documenting the changes just in case: +* The following functions are expected to be used internally only to Cosmos, so we hope these won't impact end-users, but we are documenting the changes just in case: - ``generate_task_or_group`` receives ``render_config`` instead of its individual configurations, such as test_behavior, source_rendering_behavior and enable_owner_inheritance - ``create_task_metadata`` receives ``render_config`` instead of its individual configurations, such as test_behavior, source_rendering_behavior and enable_owner_inheritance - ``create_task_metadata`` expects the ``node_converters`` argument From 5c8d1da1bb328c5bf74c79de546d890bc7026c90 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 4 Nov 2025 14:56:22 +0000 Subject: [PATCH 16/23] Add pre-commit to changelog --- CHANGELOG.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7d2b1846a3..df939bdaab 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,15 +1,15 @@ Changelog ========= -1.12.0 (2025-11-04) +1.12.0a1 (2025-11-04) ------------------- Breaking changes -* The following functions are expected to be used internally only to Cosmos, so we hope these won't impact end-users, but we are documenting the changes just in case: - - ``generate_task_or_group`` receives ``render_config`` instead of its individual configurations, such as test_behavior, source_rendering_behavior and enable_owner_inheritance - - ``create_task_metadata`` receives ``render_config`` instead of its individual configurations, such as test_behavior, source_rendering_behavior and enable_owner_inheritance - - ``create_task_metadata`` expects the ``node_converters`` argument +* Introduced in the PR #2080. The following functions are expected to be used internally only to Cosmos, so we hope these won't impact end-users, but we are documenting the changes just in case: + - ``generate_task_or_group`` receives ``render_config`` instead of its individual configurations, such as ``test_behavior``, ``source_rendering_behavior`` and ``enable_owner_inheritance`` + - ``create_task_metadata`` receives ``render_config`` instead of its individual configurations, such as ``test_behavior``, ``source_rendering_behavior`` and ``enable_owner_inheritance`` + - ``create_task_metadata`` now expects the ``node_converters`` argument Features @@ -23,7 +23,7 @@ Enhancements Others * Fix broken CI due to fastapi incompatibility with cadwyn for Airflow 3 by @pankajkoti in #2076 - +* Pre-commit updates: #2078 1.11.0 (2025-10-29) --------------------- From 50923d0f9ef0434b50fffaaee6d815e4df90ab88 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 4 Nov 2025 15:25:57 +0000 Subject: [PATCH 17/23] Forward arguments introduced in 1.11.0 to conversion_function normalize_task_id=render_config.normalize_task_id, normalize_task_display_name=render_config.normalize_task_display_name, enable_owner_inheritance=render_config.enable_owner_inheritance, --- cosmos/airflow/graph.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index f6e3206784..5cf0f24337 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -476,9 +476,11 @@ def generate_or_convert_task( task_args=task_args, test_behavior=render_config.test_behavior, source_rendering_behavior=render_config.source_rendering_behavior, + normalize_task_id=render_config.normalize_task_id, + normalize_task_display_name=render_config.normalize_task_display_name, + enable_owner_inheritance=render_config.enable_owner_inheritance, test_indirect_selection=test_indirect_selection, on_warning_callback=on_warning_callback, - normalize_task_id=render_config.normalize_task_id, node=node, task_id=task_id, detached_from_parent=detached_from_parent, From 3d519a5fc79b4aec3516c3c73d54d9b9b27cea0d Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 4 Nov 2025 15:33:37 +0000 Subject: [PATCH 18/23] Add missing properties introduced since https://github.com/astronomer/astronomer-cosmos/pull/1759. was first implemented --- cosmos/__init__.py | 2 +- cosmos/airflow/graph.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 3422485ca5..4e62b386bd 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -9,7 +9,7 @@ from cosmos import settings -__version__ = "1.12.0a1" +__version__ = "1.12.0a2" if not settings.enable_memory_optimised_imports: from cosmos.airflow.dag import DbtDag diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 5cf0f24337..020cd4f2f8 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -468,6 +468,7 @@ def generate_or_convert_task( "Its syntax and behavior can be changed before a major release." ) logger.debug(f"Converting node <{node.unique_id}> task <{task_id}> using <{conversion_function.__name__}>") + # In Cosmos 2.0 we should review this implementation and use render_config or another simpler interface: task = conversion_function( # type: ignore dag=dag, task_group=task_group, @@ -475,6 +476,7 @@ def generate_or_convert_task( execution_mode=execution_mode, task_args=task_args, test_behavior=render_config.test_behavior, + source_pruning=render_config.source_pruning, source_rendering_behavior=render_config.source_rendering_behavior, normalize_task_id=render_config.normalize_task_id, normalize_task_display_name=render_config.normalize_task_display_name, From 8649e1a087bdff15941801c0733b85ab0a60f0f0 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 6 Nov 2025 16:11:10 +0000 Subject: [PATCH 19/23] Update config.py --- cosmos/config.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cosmos/config.py b/cosmos/config.py index 2807b5ebb8..1bfe248fd7 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -76,7 +76,6 @@ class RenderConfig: :param normalize_task_display_name: A callable that takes a dbt node as input and returns the task display name. This allows users to assign a custom task display name separate from the node ID. :param should_detach_multiple_parents_tests: A boolean that allows users to decide whether to run tests with multiple parent dependencies in separate tasks. :param enable_owner_inheritance: A boolean that allows users to enable the owner inheritance from dbt models to airflow tasks. Defaults to True. - :param node_conversion_by_task_group: A boolean that allows users to do node conversion at the task group level instead of task level. Defaults to True. """ emit_datasets: bool = True From 44af853fda9a2260386b171e88febdf551214988 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 6 Nov 2025 16:16:38 +0000 Subject: [PATCH 20/23] Apply suggestions from code review --- cosmos/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cosmos/config.py b/cosmos/config.py index 1bfe248fd7..f31067b669 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -64,6 +64,7 @@ class RenderConfig: :param selector: Name of a dbt YAML selector to use for parsing. Only supported when using ``load_method=LoadMode.DBT_LS``. :param dbt_deps: (deprecated) Configure to run dbt deps when using dbt ls for dag parsing :param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. + :param node_conversion_by_task_group: A boolean that allows users to do node conversion at the task group level instead of task level. Defaults to True. :param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. :param env_vars: (Deprecated since Cosmos 1.3 use ProjectConfig.env_vars) A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``. :param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``. From d03ab3fb238416ac0ec6a37f955f26d5533a2d85 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 6 Nov 2025 16:17:37 +0000 Subject: [PATCH 21/23] Remove warning since node_converters is no longer experimental --- cosmos/airflow/graph.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 5a5eaa8062..5db6e3d162 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -463,10 +463,6 @@ def generate_or_convert_task( conversion_function = node_converters.get(resource_type, None) if conversion_function is not None: task_id = task_meta.id - logger.warning( - "The `node_converters` attribute is an experimental feature. " - "Its syntax and behavior can be changed before a major release." - ) logger.debug(f"Converting node <{node.unique_id}> task <{task_id}> using <{conversion_function.__name__}>") # In Cosmos 2.0 we should review this implementation and use render_config or another simpler interface: task = conversion_function( # type: ignore From c17c9e1d952ebddaa94a65282ac333a8caaf75ed Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 5 Nov 2025 14:48:10 +0000 Subject: [PATCH 22/23] Update changelog with 1.12.0a1 --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index df939bdaab..cad369a8d0 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,7 @@ Changelog ========= -1.12.0a1 (2025-11-04) +1.12.0a2 (2025-11-04) ------------------- Breaking changes From ae3954f7065f0d2b4b37bb88c83e7e5c340a9140 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 7 Nov 2025 13:08:26 +0000 Subject: [PATCH 23/23] Remove dbt 1.5 from run-integration-tests-dbt-async --- cosmos/config.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cosmos/config.py b/cosmos/config.py index f31067b669..c34c4c7bd3 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -102,7 +102,6 @@ class RenderConfig: normalize_task_display_name: Callable[..., Any] | None = None should_detach_multiple_parents_tests: bool = False enable_owner_inheritance: bool | None = True - node_conversion_by_task_group: bool | None = True def __post_init__(self, dbt_project_path: str | Path | None) -> None: if self.env_vars: