diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py index 24e62789b597b..dc91fdccdf2ba 100644 --- a/airflow/models/trigger.py +++ b/airflow/models/trigger.py @@ -24,6 +24,7 @@ from airflow.models.taskinstance import TaskInstance from airflow.triggers.base import BaseTrigger from airflow.utils import timezone +from airflow.utils.retries import run_with_db_retries from airflow.utils.session import provide_session from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime from airflow.utils.state import State @@ -88,9 +89,11 @@ def clean_unused(cls, session=None): (triggers have a one-to-many relationship to both) """ # Update all task instances with trigger IDs that are not DEFERRED to remove them - session.query(TaskInstance).filter( - TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None) - ).update({TaskInstance.trigger_id: None}) + for attempt in run_with_db_retries(): + with attempt: + session.query(TaskInstance).filter( + TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None) + ).update({TaskInstance.trigger_id: None}) # Get all triggers that have no task instances depending on them... ids = [ trigger_id