Refactor airflow/graph.py to simplify code-base#2080
Merged
Conversation
…de_converter_at_task_level
…de_converter_at_task_level
…de_converter_at_task_level
airflow/graph.py to simplify code-base
normalize_task_id=render_config.normalize_task_id,
normalize_task_display_name=render_config.normalize_task_display_name,
enable_owner_inheritance=render_config.enable_owner_inheritance,
Contributor
There was a problem hiding this comment.
Pull Request Overview
This PR refactors the airflow/graph.py module to reduce code complexity and duplication introduced in PR #1759. The refactoring consolidates multiple configuration parameters into a RenderConfig object and adds support for finer-grained node conversion control.
Key changes:
- Consolidates individual configuration parameters (
test_behavior,source_rendering_behavior,enable_owner_inheritance, etc.) into a singlerender_configparameter - Introduces
node_conversion_by_task_groupflag to control whether node converters apply at task group or individual task level - Adds new
generate_or_convert_taskfunction to centralize task creation/conversion logic
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| cosmos/init.py | Version bump to 1.12.0a2 |
| CHANGELOG.rst | Documents breaking changes and new features for v1.12.0a1 |
| cosmos/config.py | Adds node_conversion_by_task_group parameter to RenderConfig |
| docs/configuration/render-config.rst | Documents the new node_conversion_by_task_group parameter and updates node converter documentation |
| cosmos/airflow/graph.py | Major refactoring to consolidate parameters into render_config and add generate_or_convert_task function |
| tests/airflow/test_graph.py | Updates tests to use RenderConfig wrapper and adds comprehensive node converter tests |
| tests/dbt/test_pruning.py | Updates tests to pass render_config instead of individual parameters |
Comments suppressed due to low confidence (1)
tests/airflow/test_graph.py:1
- The
test_behaviorparameter is passed both as a standalone argument (line 367) and inside the RenderConfig (implicitly using the default). This is inconsistent with the refactoring goal. The standalonetest_behaviorparameter should be removed and only set within the RenderConfig.
import os
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
tatiana
commented
Nov 6, 2025
tatiana
commented
Nov 6, 2025
tatiana
commented
Nov 6, 2025
pankajastro
approved these changes
Nov 7, 2025
Merged
pankajkoti
added a commit
that referenced
this pull request
Dec 18, 2025
Breaking changes * Introduced in the PR #2080. The following functions are expected to be used internally only to Cosmos, so we hope these won't impact end-users, but we are documenting the changes just in case: - ``generate_task_or_group`` receives ``render_config`` instead of its individual configurations, such as ``test_behavior``, ``source_rendering_behavior`` and ``enable_owner_inheritance`` - ``create_task_metadata`` receives ``render_config`` instead of its individual configurations, such as ``test_behavior``, ``source_rendering_behavior`` and ``enable_owner_inheritance`` - ``create_task_metadata`` now expects the ``node_converters`` argument * Drop Python 3.9 support by @pankajastro in #2118 * Drop Airflow 2.4 support by @pankajastro in #2161 * Drop Airflow 2.5 support by @pankajastro in #2165 Features * Support applying ``node_converter`` at a task level instead of task group level by @anyapriya in #1759 * Allow overriding ``DbtProducerWatcherOperator`` parameters via ``ExecutionConfig.setup_operator_args`` by @pankajastro in #2133 * Use deferrable sensors by default in ``ExecutionMode.WATCHER`` by @pankajastro in #2084 * Support real-time consumer updates when using ``ExecutionMode.WATCHER`` and ``InvocationMode.SUBPROCESS`` by @pankajastro in #2152 * Update telemetry to v3 format with query parameters by @pankajkoti in #2192 * Add initial set of telemetry task listener metrics for Cosmos operators by @pankajkoti in #2195 Enhancements * Unify Airflow version handling into ``constants.py`` by @tatiana in #2089 * Refactor ``airflow/graph.py`` to simplify the code base by @tatiana in #2080 * Force watcher producer retries to zero by @pankajkoti in #2114 * Fail ``ExecutionMode.WATCHER`` consumer sensors immediately when the producer fails using Airflow context by @pankajkoti in #2126 * ``ExecutonMode.WATCHER``: fetch producer status asynchronously from the Airflow runtime so deferrable sensors fail immediately when the producer task fails by @pankajkoti in #2144 * Refactor ``ExecutionMode.WATCHER`` ``InvocationMode.SUBPROCESS`` log parser by @tatiana in #2183 * Replace map_index with is_mapped_task boolean in task telemetry metrics by @pankajkoti in #2210 * Collect cosmos profile metrics in task telemetry metrics by @pankajastro in #2198 * Remove unnecessary information from telemetry by @tatiana in #2211 Bug fixes * Clarify ``ExecutionMode.WATCHER`` deferrable failure messaging by @pankajkoti in #2124 * Remove empty test tasks when all tests are detached by @anyapriya in #2010 * Fix forwarding ``DbtProducerWatcherOperator`` ``dbt build`` flags by @michal-mrazek in #2127 * Add databricks oauth mock profile by @fjmacagno in #2164 * Register listeners in Airflow 3 plugin implementation by @pankajastro in #2187 * Fix resolution of ``packages-install-path`` when it uses ``env_var`` by @tatiana in #2194 * Fix ``template_fields`` in ``DbtConsumerWatcherSensor`` to include ``DbtRunLocalOperator`` template_fields`` by @tiovader and @emanuel-luis in #2209 * Emit asset events in ExecutionMode.AIRFLOW_ASYNC mode by @pankajastro in #2184 * Remove dag_run_id from telemetry tests by @tatiana in #2213 Docs * Document dataset-event limitation when using ``ExecutionMode.AIRFLOW_ASYNC`` by @varaprasadregani in #2143 * Expand ``ExecutionMode.KUBERNETES`` guidance by @tatiana in #2139 * Add docs for deferrable ``DbtConsumerWatcherSensor`` by @pankajastro in #2115 * Fix reStructuredText formatting by @dnskr in #2132 * Add docs for ``setup_operator_args`` param by @pankajastro in #2136 * Remove experimental flag for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #2153 * Clarify ``ExecutionMode.AIRFLOW_ASYNC`` dataset limits by @pankajkoti in #2167 * Update PRIVACY_NOTICE.rst by @tatiana in #2212 Others * Drop Python 3.9 support by @pankajastro in #2118 * Drop Airflow 2.4 support by @pankajastro in #2161 * Drop Airflow 2.5 support by @pankajastro in #2165 * Improve example DAG ``jaffle_shop_kubernetes.py`` by @tatiana in #2140 * Enable tests for Python 3.13 by @pankajastro in #2154 * Add Python 3.12 to CI integration tests matrix by @pankajastro in #2168 * Retry flaky Telemetry success test to stabilise CI by @pankajkoti in #2138 * Drop unused producer state xcom handling in ``ExecutionMode.WATCHER`` by @pankajkoti in #2145 * Remove unused Python3.9 uses from Github action CI by @pankajastro in #2117 * Run pre-commit on ``ExecutionMode.WATCHER`` modules by @pankajkoti in #2150 * Refactor: Use shared airflow version constant by @pankajkoti in #2157 * Pin ``pydantic<2.0`` for Airflow 2.6 compatibility by @pankajastro in #2172 * Remove duplicate ``dbt-duckdb`` dependency by @pankajastro in #2170 * Add targeted ``type: ignore`` for untyped decorators to fix ``mypy`` errors by @pankajastro in #2174 * Replace Legacy typing Aliases with Built-in Types for Python 3.10+ by @pankajastro in #2175 * Refactor to reuse ``load_method_from_module`` from ``_utils/importer.py`` by @pankajastro in #2176 * Remove try except block for cache import and unused python_version variable by @pankajastro in #2186 * Unpin Airflow to satisfy GitHub Security tab requirements by @pankajastro in #2171 * Update Python version for ``pyupgrade`` in ``pre-commit`` config by @pankajastro in #2190 * Add cooldown config in ``dependabot`` config by @pankajastro in #2189 * Adjust pre-commit so Python 3.10 or higher can be used by @tatiana in #2196 * Remove empty variables emission from telemetry metrics by @pankajkoti in #2197 * Reformat documented comments for historical URL formats by @pankajkoti in #2199 * Bump ``actions/checkout`` from ``5.0.0`` to ``5.0.1`` by @dependabot in #2135 * Bump ``actions/checkout`` to ``6.0.0`` in GitHub workflows by @dependabot in #2147 * Bump ``zizmorcore/zizmor-action`` from ``0.2.0`` to ``0.3.0`` by @dependabot in #2156 * Bump ``actions/checkout`` from ``5.0.1`` to ``6.0.0`` by @dependabot in #2155 * Bump ``actions/checkout`` from ``6.0.0`` to ``6.0.1`` by @dependabot in #2178 * Bump ``codecov/codecov-action`` from ``5.5.1`` to ``5.5.2`` by @dependabot in #2208 * pre-commit autoupdate by @pre-commit-ci[bot] in #2134, #2162, #2173, #2191, #2202 closes: astronomer/oss-integrations-private#275
2 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The goal with this PR is to refactor the
airflow/graph.pyto make the code more manageable.The PR #1759 should be merged before the current changes are applied.
The complexity of building DAGs has increased significantly with #1759. As part of this PR, we ended up duplicating configurations and argument passing in multiple parts of the code, adding more complexity to the
airflow/graph.pymodule.This PR also aims to add documentation changes that will help users of
node_convertersandnode_conversion_by_task_group.Breaking changes
The following functions are expected to be used internally only to Cosmos, so we hope these won't impact end-users, but we are documenting the changes just in case:
generate_task_or_groupreceivesrender_configinstead of its individual configurations, such astest_behavior,source_rendering_behaviorandenable_owner_inheritancecreate_task_metadatareceivesrender_configinstead of its individual configurations, such astest_behavior,source_rendering_behaviorandenable_owner_inheritancecreate_task_metadatanow expects thenode_convertersargument