Skip to content
1 change: 1 addition & 0 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def _initialize_map() -> dict[str, Callable]:

functions: list[Callable] = [
DagFileProcessor.update_import_errors,
DagFileProcessor.manage_slas,
DagModel.get_paused_dag_ids,
DagFileProcessorManager.clear_nonexistent_import_errors,
XCom.get_value,
Expand Down
33 changes: 24 additions & 9 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,10 @@ def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.L
self._dag_directory = dag_directory
self.dag_warnings: set[tuple[str, str]] = set()

@classmethod
@internal_api_call
@provide_session
def manage_slas(self, dag: DAG, session: Session = None) -> None:
def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) -> None:
"""
Finding all tasks that have SLAs defined, and sending alert emails when needed.

Expand All @@ -375,9 +377,11 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
We are assuming that the scheduler runs often, so we only check for
tasks that should have succeeded in the past hour.
"""
self.log.info("Running SLA Checks for %s", dag.dag_id)
dagbag = DagFileProcessor._get_dagbag(dag_folder)
dag = dagbag.get_dag(dag_id)
cls.logger().info("Running SLA Checks for %s", dag.dag_id)
if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks):
self.log.info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag)
cls.logger().info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag)
return

qry = (
Expand Down Expand Up @@ -481,7 +485,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
else [dag.sla_miss_callback]
)
for callback in callbacks:
self.log.info("Calling SLA miss callback %s", callback)
cls.logger().info("Calling SLA miss callback %s", callback)
try:
callback(dag, task_list, blocking_task_list, slas, blocking_tis)
notification_sent = True
Expand All @@ -493,7 +497,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
"func_name": callback.__name__,
},
)
self.log.exception(
cls.logger().exception(
"Could not call sla_miss_callback(%s) for DAG %s",
callback.__name__,
dag.dag_id,
Expand All @@ -512,7 +516,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
task = dag.get_task(sla.task_id)
except TaskNotFound:
# task already deleted from DAG, skip it
self.log.warning(
cls.logger().warning(
"Task %s doesn't exist in DAG anymore, skipping SLA miss notification.", sla.task_id
)
continue
Expand All @@ -532,7 +536,9 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
notification_sent = True
except Exception:
Stats.incr("sla_email_notification_failure", tags={"dag_id": dag.dag_id})
self.log.exception("Could not send SLA Miss email notification for DAG %s", dag.dag_id)
cls.logger().exception(
"Could not send SLA Miss email notification for DAG %s", dag.dag_id
)
# If we sent any notification, update the sla_miss table
if notification_sent:
for sla in slas:
Expand Down Expand Up @@ -652,7 +658,7 @@ def execute_callbacks(
if isinstance(request, TaskCallbackRequest):
self._execute_task_callbacks(dagbag, request, session=session)
elif isinstance(request, SlaCallbackRequest):
self.manage_slas(dagbag.get_dag(request.dag_id), session=session)
DagFileProcessor.manage_slas(dagbag.dag_folder, request.dag_id, session=session)
elif isinstance(request, DagCallbackRequest):
self._execute_dag_callbacks(dagbag, request, session)
except Exception:
Expand Down Expand Up @@ -736,6 +742,15 @@ def _execute_task_callbacks(self, dagbag: DagBag | None, request: TaskCallbackRe
self.log.info("Executed failure callback for %s in state %s", ti, ti.state)
session.flush()

@classmethod
def _get_dagbag(cls, file_path: str):
try:
return DagBag(file_path, include_examples=False)
except Exception:
cls.logger().exception("Failed at reloading the DAG file %s", file_path)
Stats.incr("dag_file_refresh_error", 1, 1)
raise

@provide_session
def process_file(
self,
Expand Down Expand Up @@ -766,7 +781,7 @@ def process_file(
self.log.info("Processing file %s for tasks to queue", file_path)

try:
dagbag = DagBag(file_path, include_examples=False)
dagbag = DagFileProcessor._get_dagbag(file_path)
except Exception:
self.log.exception("Failed at reloading the DAG file %s", file_path)
Stats.incr("dag_file_refresh_error", 1, 1, tags={"file_path": file_path})
Expand Down
20 changes: 16 additions & 4 deletions airflow/utils/log/logging_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import sys
from io import IOBase
from logging import Handler, Logger, StreamHandler
from typing import IO, cast
from typing import IO, Any, TypeVar, cast

from airflow.settings import IS_K8S_EXECUTOR_POD

Expand Down Expand Up @@ -59,6 +59,9 @@ def remove_escape_codes(text: str) -> str:
return ANSI_ESCAPE.sub("", text)


_T = TypeVar("_T")


class LoggingMixin:
"""Convenience super-class to have a logger configured with the class name"""

Expand All @@ -67,12 +70,21 @@ class LoggingMixin:
def __init__(self, context=None):
self._set_context(context)

@staticmethod
def _get_log(obj: Any, clazz: type[_T]) -> Logger:
if obj._log is None:
obj._log = logging.getLogger(f"{clazz.__module__}.{clazz.__name__}")
return obj._log

@classmethod
def logger(cls) -> Logger:
"""Returns a logger."""
return LoggingMixin._get_log(cls, cls)

@property
def log(self) -> Logger:
"""Returns a logger."""
if self._log is None:
self._log = logging.getLogger(self.__class__.__module__ + "." + self.__class__.__name__)
return self._log
return LoggingMixin._get_log(self, self.__class__)

def _set_context(self, context):
if context is not None:
Expand Down
Loading