Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from airflow.stats import Stats
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.typing_compat import TypedDict
from airflow.utils import timezone
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.trigger_handler import (
Expand Down Expand Up @@ -608,6 +609,13 @@ async def run_trigger(self, trigger_id, trigger):
self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event)
self.triggers[trigger_id]["events"] += 1
self.events.append((trigger_id, event))
except asyncio.CancelledError as err:
if timeout := trigger.task_instance.trigger_timeout:
timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout
if timeout < timezone.utcnow():
self.log.error("Trigger cancelled due to timeout")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the timeout case, does it log the error twice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it logs the timeout error and the original asyncio.CancelledError message

self.log.error("Trigger cancelled; message=%s", err)
raise
finally:
# CancelledError will get injected when we're stopped - which is
# fine, the cleanup process will understand that, but we want to
Expand Down
29 changes: 28 additions & 1 deletion tests/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import importlib
import time
from threading import Thread
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pendulum
import pytest
Expand Down Expand Up @@ -259,6 +259,33 @@ def test_trigger_lifecycle(session):
job_runner.trigger_runner.stop = True


class TestTriggerRunner:
@pytest.mark.asyncio
@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging")
async def test_run_trigger_canceled(self, session) -> None:
trigger_runner = TriggerRunner()
trigger_runner.triggers = {1: {"task": MagicMock(), "name": "mock_name", "events": 0}}
mock_trigger = MagicMock()
mock_trigger.task_instance.trigger_timeout = None
mock_trigger.run.side_effect = asyncio.CancelledError()

with pytest.raises(asyncio.CancelledError):
await trigger_runner.run_trigger(1, mock_trigger)

@pytest.mark.asyncio
@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging")
async def test_run_trigger_timeout(self, session, caplog) -> None:
trigger_runner = TriggerRunner()
trigger_runner.triggers = {1: {"task": MagicMock(), "name": "mock_name", "events": 0}}
mock_trigger = MagicMock()
mock_trigger.task_instance.trigger_timeout = timezone.utcnow() - datetime.timedelta(hours=1)
mock_trigger.run.side_effect = asyncio.CancelledError()

with pytest.raises(asyncio.CancelledError):
await trigger_runner.run_trigger(1, mock_trigger)
assert "Trigger cancelled due to timeout" in caplog.text


def test_trigger_create_race_condition_18392(session, tmp_path):
"""
This verifies the resolution of race condition documented in github issue #18392.
Expand Down