From 152fd9d4887349fc35c3faaa85f7ef7f04c17676 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 12 Nov 2025 16:08:04 +0000 Subject: [PATCH 1/3] Fix SQL templated field rendering for dynamically mapped tasks in AF2 Closes: #2018 --- cosmos/operators/local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 2d0aa97744..5bf65304da 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -444,12 +444,12 @@ def _override_rtif_airflow_2_x(session: Session = NEW_SESSION) -> None: assert ti.task is not None ti.task.template_fields = self.template_fields rtif = RenderedTaskInstanceFields(ti, render_templates=False) - # delete the old records session.query(RenderedTaskInstanceFields).filter( RenderedTaskInstanceFields.dag_id == self.dag_id, # type: ignore[attr-defined] RenderedTaskInstanceFields.task_id == self.task_id, RenderedTaskInstanceFields.run_id == ti.run_id, + RenderedTaskInstanceFields.map_index == ti.map_index ).delete() session.add(rtif) else: From 3de7ce4e3255a8099c39c13bd94d14d6581511ca Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 12 Nov 2025 16:29:05 +0000 Subject: [PATCH 2/3] Fix SQL templated field rendering for dynamically mapped tasks in AF2 Closes: #2018 --- cosmos/operators/local.py | 2 +- tests/operators/test_local.py | 56 +++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 5bf65304da..7a69402aa3 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -449,7 +449,7 @@ def _override_rtif_airflow_2_x(session: Session = NEW_SESSION) -> None: RenderedTaskInstanceFields.dag_id == self.dag_id, # type: ignore[attr-defined] RenderedTaskInstanceFields.task_id == self.task_id, RenderedTaskInstanceFields.run_id == ti.run_id, - RenderedTaskInstanceFields.map_index == ti.map_index + RenderedTaskInstanceFields.map_index == ti.map_index, ).delete() session.add(rtif) else: diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 6b536819ae..bdf9654db4 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -2014,6 +2014,62 @@ def test_override_rtif_airflow3_with_should_store_compiled_sql_false(): assert operator.overwrite_rtif_after_execution is False +@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test only applies to Airflow 2") +def test_override_rtif_airflow2_filters_by_map_index(): + """Test that _override_rtif correctly filters by map_index for dynamically mapped tasks in Airflow 2. + + This test ensures that when deleting old RenderedTaskInstanceFields records, + the map_index filter is applied so that only the current mapped task instance's + records are deleted, not records from other mapped task instances. + + This addresses issue #2018 where SQL templated fields weren't properly rendered + for dynamically mapped tasks. + """ + from airflow.models.renderedtifields import RenderedTaskInstanceFields + + with DAG("test_dag", start_date=datetime(2023, 1, 1)) as dag: + operator = DbtRunLocalOperator( + task_id="test", + profile_config=profile_config, + project_dir="my/dir", + should_store_compiled_sql=True, + dag=dag, + ) + + mock_ti = MagicMock(spec=TaskInstance) + mock_ti.dag_id = "test_dag" + mock_ti.task_id = "test" + mock_ti.run_id = "test_run_1" + mock_ti.map_index = 2 # Simulating a mapped task instance + mock_ti.task = operator + + context = {"ti": mock_ti} + + mock_session = MagicMock() + mock_query = MagicMock() + mock_filter = MagicMock() + + mock_session.query.return_value = mock_query + mock_query.filter.return_value = mock_filter + mock_filter.delete.return_value = None + + with patch("airflow.utils.session.provide_session") as mock_provide_session: + def session_decorator(func): + def wrapper(*args, **kwargs): + return func(session=mock_session) + + return wrapper + + mock_provide_session.side_effect = session_decorator + + operator._override_rtif(context) + + mock_session.query.assert_called_once_with(RenderedTaskInstanceFields) + mock_query.filter.assert_called_once() + filter_call_arg_map_index = str(mock_query.filter.call_args.args[-1]) + assert filter_call_arg_map_index =='rendered_task_instance_fields.map_index = :map_index_1' + + def test_dbt_cmd_flags_templating(): """Test that dbt_cmd_flags supports Jinja templating.""" from datetime import datetime From f4620d191da97037a6b93b2f0b4e1ff99a51b1f4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 12 Nov 2025 16:33:35 +0000 Subject: [PATCH 3/3] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/operators/test_local.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index bdf9654db4..6783b67c6a 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -2054,6 +2054,7 @@ def test_override_rtif_airflow2_filters_by_map_index(): mock_filter.delete.return_value = None with patch("airflow.utils.session.provide_session") as mock_provide_session: + def session_decorator(func): def wrapper(*args, **kwargs): return func(session=mock_session) @@ -2067,7 +2068,7 @@ def wrapper(*args, **kwargs): mock_session.query.assert_called_once_with(RenderedTaskInstanceFields) mock_query.filter.assert_called_once() filter_call_arg_map_index = str(mock_query.filter.call_args.args[-1]) - assert filter_call_arg_map_index =='rendered_task_instance_fields.map_index = :map_index_1' + assert filter_call_arg_map_index == "rendered_task_instance_fields.map_index = :map_index_1" def test_dbt_cmd_flags_templating():