From 7e3cde756610b6bd3f447a9ee32590b7e8163a4a Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 11 Feb 2025 21:49:31 +0800 Subject: [PATCH] feat(task_sdk): remove data_interval_start, data_interval_end, prev_data_interval_start_success, prev_data_interval_end_success for dag_run that has no data_interval --- .../airflow/sdk/execution_time/task_runner.py | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py b/task_sdk/src/airflow/sdk/execution_time/task_runner.py index 3e41f24589d3d..5e1e1063ea92c 100644 --- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py @@ -155,16 +155,8 @@ def get_template_context(self) -> Context: context_from_server: Context = { # TODO: Assess if we need to pass these through timezone.coerce_datetime "dag_run": dag_run, # type: ignore[typeddict-item] # Removable after #46522 - "data_interval_end": dag_run.data_interval_end, - "data_interval_start": dag_run.data_interval_start, "task_instance_key_str": f"{self.task.dag_id}__{self.task.task_id}__{dag_run.run_id}", "task_reschedule_count": self._ti_context_from_server.task_reschedule_count, - "prev_data_interval_start_success": lazy_object_proxy.Proxy( - lambda: get_previous_dagrun_success(self.id).data_interval_start - ), - "prev_data_interval_end_success": lazy_object_proxy.Proxy( - lambda: get_previous_dagrun_success(self.id).data_interval_end - ), "prev_start_date_success": lazy_object_proxy.Proxy( lambda: get_previous_dagrun_success(self.id).start_date ), @@ -174,6 +166,20 @@ def get_template_context(self) -> Context: } context.update(context_from_server) + if dag_run.data_interval_start and dag_run.data_interval_end: + context.update( + { + "data_interval_end": dag_run.data_interval_end, + "data_interval_start": dag_run.data_interval_start, + "prev_data_interval_start_success": lazy_object_proxy.Proxy( + lambda: get_previous_dagrun_success(self.id).data_interval_start + ), + "prev_data_interval_end_success": lazy_object_proxy.Proxy( + lambda: get_previous_dagrun_success(self.id).data_interval_end + ), + } + ) + if logical_date := dag_run.logical_date: ds = logical_date.strftime("%Y-%m-%d") ds_nodash = ds.replace("-", "")