diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 7c6b2b20444dd..5f4b77cefa00f 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -40,6 +40,7 @@ from airflow.stats import Stats from airflow.triggers.base import BaseTrigger, TriggerEvent from airflow.typing_compat import TypedDict +from airflow.utils import timezone from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.log.trigger_handler import ( @@ -608,6 +609,13 @@ async def run_trigger(self, trigger_id, trigger): self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event) self.triggers[trigger_id]["events"] += 1 self.events.append((trigger_id, event)) + except asyncio.CancelledError as err: + if timeout := trigger.task_instance.trigger_timeout: + timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout + if timeout < timezone.utcnow(): + self.log.error("Trigger cancelled due to timeout") + self.log.error("Trigger cancelled; message=%s", err) + raise finally: # CancelledError will get injected when we're stopped - which is # fine, the cleanup process will understand that, but we want to diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index 0354465ea4cf4..73b8d878a267d 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -22,7 +22,7 @@ import importlib import time from threading import Thread -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pendulum import pytest @@ -259,6 +259,33 @@ def test_trigger_lifecycle(session): job_runner.trigger_runner.stop = True +class TestTriggerRunner: + @pytest.mark.asyncio + @patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging") + async def test_run_trigger_canceled(self, session) -> None: + trigger_runner = TriggerRunner() + trigger_runner.triggers = {1: {"task": MagicMock(), "name": "mock_name", "events": 0}} + mock_trigger = MagicMock() + mock_trigger.task_instance.trigger_timeout = None + mock_trigger.run.side_effect = asyncio.CancelledError() + + with pytest.raises(asyncio.CancelledError): + await trigger_runner.run_trigger(1, mock_trigger) + + @pytest.mark.asyncio + @patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging") + async def test_run_trigger_timeout(self, session, caplog) -> None: + trigger_runner = TriggerRunner() + trigger_runner.triggers = {1: {"task": MagicMock(), "name": "mock_name", "events": 0}} + mock_trigger = MagicMock() + mock_trigger.task_instance.trigger_timeout = timezone.utcnow() - datetime.timedelta(hours=1) + mock_trigger.run.side_effect = asyncio.CancelledError() + + with pytest.raises(asyncio.CancelledError): + await trigger_runner.run_trigger(1, mock_trigger) + assert "Trigger cancelled due to timeout" in caplog.text + + def test_trigger_create_race_condition_18392(session, tmp_path): """ This verifies the resolution of race condition documented in github issue #18392.