From 49c58d4b74482e58c72136a6227521dfbb2cbbb1 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 7 Jun 2023 17:55:44 +0800 Subject: [PATCH 01/12] feat(jobs/triggerer_job_runner): add triggerer canceled log #31720 --- airflow/jobs/triggerer_job_runner.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 7c6b2b20444dd..dd821af7e9a27 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -608,6 +608,8 @@ async def run_trigger(self, trigger_id, trigger): self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event) self.triggers[trigger_id]["events"] += 1 self.events.append((trigger_id, event)) + except asyncio.CancelledError: + self.log.info("Trigger cancelled due to timeout") finally: # CancelledError will get injected when we're stopped - which is # fine, the cleanup process will understand that, but we want to From a5e718bc0748e07ab7925016352a368e48b32c61 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 9 Jun 2023 12:33:09 +0800 Subject: [PATCH 02/12] feat(jobs/triggerer_job_runner): reraise CanceledError after logging --- airflow/jobs/triggerer_job_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index dd821af7e9a27..eec0c70a6efcf 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -610,6 +610,7 @@ async def run_trigger(self, trigger_id, trigger): self.events.append((trigger_id, event)) except asyncio.CancelledError: self.log.info("Trigger cancelled due to timeout") + raise finally: # CancelledError will get injected when we're stopped - which is # fine, the cleanup process will understand that, but we want to From 7ba948c78c3d7b990104ae634a4f6e8399be6a6b Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 12 Jun 2023 14:44:20 +0800 Subject: [PATCH 03/12] fix(triggerer_job): add detailed excetption message to asyncio CancelledError --- airflow/jobs/triggerer_job_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index eec0c70a6efcf..8a5c995e1e113 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -608,8 +608,8 @@ async def run_trigger(self, trigger_id, trigger): self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event) self.triggers[trigger_id]["events"] += 1 self.events.append((trigger_id, event)) - except asyncio.CancelledError: - self.log.info("Trigger cancelled due to timeout") + except asyncio.CancelledError as err: + self.log.info("Trigger cancelled due to %s", str(err)) raise finally: # CancelledError will get injected when we're stopped - which is From 20b8e712bbeb0ea4e6af40f09b2d68cf274be3ef Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 13 Jun 2023 18:45:43 +0800 Subject: [PATCH 04/12] feat(triggerer_job_runner): check whether trigger is timeout when running --- airflow/jobs/triggerer_job_runner.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 8a5c995e1e113..9fe9b0d47c457 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -40,6 +40,7 @@ from airflow.stats import Stats from airflow.triggers.base import BaseTrigger, TriggerEvent from airflow.typing_compat import TypedDict +from airflow.utils import timezone from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.log.trigger_handler import ( @@ -608,6 +609,11 @@ async def run_trigger(self, trigger_id, trigger): self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event) self.triggers[trigger_id]["events"] += 1 self.events.append((trigger_id, event)) + + trigger_timeout = trigger.task_instance.trigger_timeout + if trigger_timeout and trigger_timeout < timezone.utcnow(): + self.log.info("Trigger cancelled due to timeout") + raise asyncio.CancelledError("Trigger cancelled due to timeout") except asyncio.CancelledError as err: self.log.info("Trigger cancelled due to %s", str(err)) raise From 2f87538db5c39befcb387d8be98d5919e81c3573 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 14 Jun 2023 10:35:27 +0800 Subject: [PATCH 05/12] refactor(triggerer_job_runner): update log message Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> --- airflow/jobs/triggerer_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 9fe9b0d47c457..8d6b97980fa5b 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -615,7 +615,7 @@ async def run_trigger(self, trigger_id, trigger): self.log.info("Trigger cancelled due to timeout") raise asyncio.CancelledError("Trigger cancelled due to timeout") except asyncio.CancelledError as err: - self.log.info("Trigger cancelled due to %s", str(err)) + self.log.info("Trigger cancelled; message=%s", str(err)) raise finally: # CancelledError will get injected when we're stopped - which is From 29bd13fca995659c98f09f1a10dc4ea52057e3f3 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 14 Jun 2023 16:32:02 +0800 Subject: [PATCH 06/12] feat(jobs/triggerer_job_runner): add triggerer canceled log --- airflow/jobs/triggerer_job_runner.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 8d6b97980fa5b..3fc2d2306b57f 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -609,12 +609,16 @@ async def run_trigger(self, trigger_id, trigger): self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event) self.triggers[trigger_id]["events"] += 1 self.events.append((trigger_id, event)) - - trigger_timeout = trigger.task_instance.trigger_timeout - if trigger_timeout and trigger_timeout < timezone.utcnow(): - self.log.info("Trigger cancelled due to timeout") - raise asyncio.CancelledError("Trigger cancelled due to timeout") except asyncio.CancelledError as err: + trigger_timeout = trigger.task_instance.trigger_timeout + if trigger_timeout: + if not trigger_timeout.tzinfo: + trigger_timeout = trigger_timeout.replace(tzinfo=timezone.utc) + + if trigger_timeout < timezone.utcnow(): + self.log.info("Trigger cancelled due to timeout") + raise asyncio.CancelledError("Trigger cancelled due to timeout") + self.log.info("Trigger cancelled; message=%s", str(err)) raise finally: From 1692e08eec10b1a36690bddc2ac20ba2160d22e1 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 14 Jun 2023 19:35:09 +0800 Subject: [PATCH 07/12] test(triggerer_job): add test case to check TriggerRunner.run_trigger asyncio.CancelledError handling --- tests/jobs/test_triggerer_job.py | 34 +++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index 0354465ea4cf4..62bb5f5319ee4 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -22,7 +22,7 @@ import importlib import time from threading import Thread -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pendulum import pytest @@ -259,6 +259,32 @@ def test_trigger_lifecycle(session): job_runner.trigger_runner.stop = True +class TestTriggerRunner: + @pytest.mark.asyncio + @patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging") + async def test_run_trigger_canceled(self, session) -> None: + trigger_runner = TriggerRunner() + trigger_runner.triggers = {1: {"task": MagicMock(), "name": "mock_name", "events": 0}} + mock_trigger = MagicMock() + mock_trigger.task_instance.trigger_timeout = None + mock_trigger.run.side_effect = asyncio.CancelledError() + + with pytest.raises(asyncio.CancelledError): + await trigger_runner.run_trigger(1, mock_trigger) + + @pytest.mark.asyncio + @patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging") + async def test_run_trigger_timeout(self, session) -> None: + trigger_runner = TriggerRunner() + trigger_runner.triggers = {1: {"task": MagicMock(), "name": "mock_name", "events": 0}} + mock_trigger = MagicMock() + mock_trigger.task_instance.trigger_timeout = timezone.utcnow() - datetime.timedelta(hours=1) + mock_trigger.run.side_effect = asyncio.CancelledError() + + with pytest.raises(asyncio.CancelledError): + await trigger_runner.run_trigger(1, mock_trigger) + + def test_trigger_create_race_condition_18392(session, tmp_path): """ This verifies the resolution of race condition documented in github issue #18392. @@ -586,3 +612,9 @@ def non_pytest_handlers(val): assert qh.__class__ == LocalQueueHandler assert qh.queue == listener.queue listener.stop() + listener = setup_queue_listener() + assert handler not in non_pytest_handlers(log.handlers) + qh = log.handlers[-1] + assert qh.__class__ == LocalQueueHandler + assert qh.queue == listener.queue + listener.stop() From f7a4f70b54bbc33651d934e239e42e25732aafac Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 14 Jun 2023 19:56:51 +0800 Subject: [PATCH 08/12] feat(triggerer_job_runner): reraise the original exception after encountering timeout --- airflow/jobs/triggerer_job_runner.py | 2 +- tests/jobs/test_triggerer_job.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 3fc2d2306b57f..d1e076718c04a 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -617,7 +617,7 @@ async def run_trigger(self, trigger_id, trigger): if trigger_timeout < timezone.utcnow(): self.log.info("Trigger cancelled due to timeout") - raise asyncio.CancelledError("Trigger cancelled due to timeout") + raise self.log.info("Trigger cancelled; message=%s", str(err)) raise diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index 62bb5f5319ee4..e5c832659ef62 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -274,7 +274,7 @@ async def test_run_trigger_canceled(self, session) -> None: @pytest.mark.asyncio @patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging") - async def test_run_trigger_timeout(self, session) -> None: + async def test_run_trigger_timeout(self, session, caplog) -> None: trigger_runner = TriggerRunner() trigger_runner.triggers = {1: {"task": MagicMock(), "name": "mock_name", "events": 0}} mock_trigger = MagicMock() @@ -283,6 +283,7 @@ async def test_run_trigger_timeout(self, session) -> None: with pytest.raises(asyncio.CancelledError): await trigger_runner.run_trigger(1, mock_trigger) + assert "Trigger cancelled due to timeout" in caplog.text def test_trigger_create_race_condition_18392(session, tmp_path): From 02e3187750fa8717293f406676afe76b43a8e67a Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 16 Jun 2023 11:03:21 +0800 Subject: [PATCH 09/12] tests(jobs/trigerer_job_runner): remove accidentally added tests Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- tests/jobs/test_triggerer_job.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index e5c832659ef62..73b8d878a267d 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -613,9 +613,3 @@ def non_pytest_handlers(val): assert qh.__class__ == LocalQueueHandler assert qh.queue == listener.queue listener.stop() - listener = setup_queue_listener() - assert handler not in non_pytest_handlers(log.handlers) - qh = log.handlers[-1] - assert qh.__class__ == LocalQueueHandler - assert qh.queue == listener.queue - listener.stop() From c38635b10cd4c5518ae9ef57bfa3fc374bc4fbf4 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 16 Jun 2023 11:12:51 +0800 Subject: [PATCH 10/12] feat(jobs/triggerer_job_runner): change logging level from info to error when logging triggerer timeout --- airflow/jobs/triggerer_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index d1e076718c04a..68cc475736337 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -616,7 +616,7 @@ async def run_trigger(self, trigger_id, trigger): trigger_timeout = trigger_timeout.replace(tzinfo=timezone.utc) if trigger_timeout < timezone.utcnow(): - self.log.info("Trigger cancelled due to timeout") + self.log.error("Trigger cancelled due to timeout") raise self.log.info("Trigger cancelled; message=%s", str(err)) From ffd870e76b62327aea2f3d5316fa169844973b96 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 16 Jun 2023 12:21:44 +0800 Subject: [PATCH 11/12] feat(jobs/triggerer_job_runner): change the logging level from into to error when encountering asyncio.CancelledError --- airflow/jobs/triggerer_job_runner.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 68cc475736337..307a0d6021251 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -617,9 +617,8 @@ async def run_trigger(self, trigger_id, trigger): if trigger_timeout < timezone.utcnow(): self.log.error("Trigger cancelled due to timeout") - raise - self.log.info("Trigger cancelled; message=%s", str(err)) + self.log.error("Trigger cancelled; message=%s", err) raise finally: # CancelledError will get injected when we're stopped - which is From ae27f913eef8f62a420190e4c77987c334df6cd0 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 16 Jun 2023 15:48:52 +0800 Subject: [PATCH 12/12] refactor(triggerer_job_runner): rewrite timeout handling using warlus operator Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> --- airflow/jobs/triggerer_job_runner.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index 307a0d6021251..5f4b77cefa00f 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -610,14 +610,10 @@ async def run_trigger(self, trigger_id, trigger): self.triggers[trigger_id]["events"] += 1 self.events.append((trigger_id, event)) except asyncio.CancelledError as err: - trigger_timeout = trigger.task_instance.trigger_timeout - if trigger_timeout: - if not trigger_timeout.tzinfo: - trigger_timeout = trigger_timeout.replace(tzinfo=timezone.utc) - - if trigger_timeout < timezone.utcnow(): + if timeout := trigger.task_instance.trigger_timeout: + timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout + if timeout < timezone.utcnow(): self.log.error("Trigger cancelled due to timeout") - self.log.error("Trigger cancelled; message=%s", err) raise finally: